Skip to content

API Reference

This API reference provides comprehensive documentation for all public classes and functions in the BAM Masterdata package. For more detailed examples and usage patterns, see the How-to Guides and Tutorial sections.

bam_masterdata.metadata.entities

BaseEntity

Bases: BaseModel

Base class used to define ObjectType and VocabularyType classes. It extends the BaseModel adding new methods that are useful for interfacing with openBIS.

Source code in bam_masterdata/metadata/entities.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
class BaseEntity(BaseModel):
    """
    Base class used to define `ObjectType` and `VocabularyType` classes. It extends the `BaseModel`
    adding new methods that are useful for interfacing with openBIS.
    """

    def __init__(self, **kwargs):
        super().__init__()

        # We store the `_property_metadata` during instantiation of the class
        self._property_metadata = self.get_property_metadata()

        for key, value in kwargs.items():
            setattr(self, key, value)

    def __setattr__(self, key, value):
        if key == "_property_metadata":
            super().__setattr__(key, value)
            return

        if key in self._property_metadata:
            # TODO add CONTROLLEDVOCABULARY and OBJECT cases
            expected_type = self._property_metadata[key].data_type.pytype
            if expected_type and not isinstance(value, expected_type):
                raise TypeError(
                    f"Invalid type for '{key}': Expected {expected_type.__name__}, got {type(value).__name__}"
                )

        # TODO add check if someone tries to set up a definition instead of an assigned property

        object.__setattr__(self, key, value)

    def __repr__(self):
        # Filter for attributes that are `PropertyTypeAssignment` and set to a finite value
        class_prop_name = None
        fields = []
        for key, metadata in self._property_metadata.items():
            if isinstance(metadata, PropertyTypeAssignment):
                value = getattr(self, key, None)
                # Only include set attributes
                if value is not None and not isinstance(value, PropertyTypeAssignment):
                    if key == "name":
                        class_prop_name = value
                    fields.append(f"{key}={repr(value)}")

        # Format the output
        class_name = self.cls_name
        if class_prop_name:  # adding `name` if available
            class_name = f"{class_prop_name}:{class_name}"
        return f"{class_name}({', '.join(fields)})"

    # Overwriting the __str__ method to use the same representation as __repr__
    __str__ = __repr__

    @property
    def cls_name(self) -> str:
        """
        Returns the entity name of the class as a string to speed up checks. This is a property
        to be overwritten by each of the abstract entity types.
        """
        return self.__class__.__name__

    @property
    def _base_attrs(self) -> list:
        """
        List of base properties or terms assigned to an entity type. This are the direct properties or terms
        assigned when defining a new entity type.
        """
        cls_attrs = self.__class__.__dict__
        base_attrs = [
            attr_name
            for attr_name in cls_attrs
            if not (
                attr_name.startswith("_")
                or callable(cls_attrs[attr_name])
                or attr_name
                in ["defs", "model_config", "model_fields", "model_computed_fields"]
            )
        ]
        return [getattr(self, attr_name) for attr_name in base_attrs]

    def _to_openbis(
        self,
        logger: "BoundLoggerLazyProxy",
        openbis: "Openbis",
        type: str,
        type_map: dict,
        get_type: Callable[..., Any],
        create_type: Callable[..., Any],
    ) -> None:
        """
        Simplified function to add or update the entity type in openBIS.
        """
        # Get all existing entities from openBIS
        openbis_entities = getattr(
            OpenbisEntities(url=openbis.url), f"get_{type}_dict"
        )()
        defs = getattr(self, "defs")

        is_vocab = isinstance(self, VocabularyType)

        # Check if the entity already exists
        if defs.code in openbis_entities:
            logger.info(f"Entity '{defs.code}' already exists in openBIS.")
            # Retrieve the existing entity
            entity = get_type(openbis, defs.code)
            # entity = openbis_entities[defs.code]

            # Get properties from self and openBIS
            self_properties = getattr(self, "terms" if is_vocab else "properties", [])
            obis_properties = (
                entity.get_terms().df.code
                if is_vocab
                else entity.get_property_assignments()
            )
            obis_property_codes = [
                prop.code if not is_vocab else prop for prop in obis_properties
            ]

            # Check for properties in self that are not in openBIS
            new_properties_added = False
            for prop in self_properties:
                if prop.code not in obis_property_codes:
                    logger.info(
                        f"Adding new '{'term' if is_vocab else 'property'}' {prop.code}' to entity '{defs.code}'."
                    )
                    new_properties_added = True

                    # Handle special case for OBJECT or SAMPLE data types
                    if not is_vocab and (
                        prop.data_type == "OBJECT" or prop.data_type == "SAMPLE"
                    ):
                        prop.data_type = "SAMPLE"

                    # Assign the term or property to the entity
                    if is_vocab:
                        term = openbis.new_term(
                            code=prop.code,
                            vocabularyCode=defs.code,
                            label=prop.label,
                            description=prop.description,
                        )
                        if prop.official:
                            term.official = prop.official
                        term.save()
                    else:
                        if prop.vocabulary_code:
                            entity.assign_property(
                                prop=prop.code,
                                section=prop.section,
                                mandatory=prop.mandatory,
                                showInEditView=prop.show_in_edit_views,
                                vocabulary=prop.vocabulary_code,
                            )
                        else:
                            entity.assign_property(
                                prop=prop.code,
                                section=prop.section,
                                mandatory=prop.mandatory,
                                showInEditView=prop.show_in_edit_views,
                            )

            if not new_properties_added:
                logger.info(
                    f"No new '{'terms' if is_vocab else 'properties'}' added to entity '{defs.code}'."
                )

            # Save the entity after adding new properties
            if not is_vocab:
                entity.save()
            return entity

        # If the entity is new, create it
        logger.info(f"Creating new entity '{defs.code}' in openBIS.")
        if not is_vocab:
            entity = create_type(openbis, defs)
            entity.save()

            # Assign properties to the new entity
            properties = getattr(self, "properties", [])
            for prop in properties:
                logger.info(f"Adding new property {prop.code} to {defs.code}.")
                # Handle special case for OBJECT or SAMPLE data types
                if prop.data_type == "OBJECT" or prop.data_type == "SAMPLE":
                    prop.data_type = "SAMPLE"

                # Assign the property to the entity
                entity.assign_property(
                    prop=prop.code,
                    section=prop.section,
                    mandatory=prop.mandatory,
                    showInEditView=prop.show_in_edit_views,
                )
        else:
            # Transform the list of VocabularyTerm objects into the desired format
            terms = [
                {
                    "code": term.code,
                    "label": term.label,
                    "description": term.description,
                }
                for term in getattr(self, "terms", [])
            ]
            term_codes = ", ".join([term.code for term in getattr(self, "terms", [])])
            logger.info(f"Adding new terms {term_codes} to {defs.code}.")
            entity = create_type(openbis, defs, terms)
            entity.save()

        # Save the entity after assigning properties
        if not is_vocab:
            entity.save()
        return entity

    def get_property_metadata(self) -> dict:
        """
        Dictionary containing the metadata of the properties assigned to the entity type.

        Returns:
            dict: A dictionary containing the keys of the `PropertyTypeAssignment` attribute names and the
            values of the definitions of `PropertyTypeAssignment`. Example:
            {
                "name": PropertyTypeAssignment(
                    code="$NAME",
                    data_type=VARCHAR,
                    mandatory=True,
                    property_label="Name"
                ),
                "age": PropertyTypeAssignment(
                    code="AGE",
                    data_type=INTEGER,
                    mandatory=False,
                    property_label="Age"
                ),
            }
        """
        cls_attrs = self.__class__.__dict__

        # Store property metadata at class level
        prop_meta_dict: dict = {}
        for base in type(self).__mro__:
            cls_attrs = getattr(base, "__dict__", {})
            for attr_name, attr_value in cls_attrs.items():
                if isinstance(attr_value, PropertyTypeAssignment):
                    prop_meta_dict[attr_name] = attr_value
        return prop_meta_dict

    def to_json(self, indent: int | None = None) -> str:
        """
        Returns the entity as a string in JSON format storing the value of the properties
        assigned to the entity.

        Args:
            indent (Optional[int], optional): The indent to print in JSON. Defaults to None.

        Returns:
            str: The JSON representation of the entity.
        """
        data: dict = {}
        for key in self._property_metadata.keys():
            try:
                value = getattr(self, key)
            except AttributeError:
                continue
            if isinstance(value, PropertyTypeAssignment):
                continue
            data[key] = value
        return json.dumps(data, indent=indent)

    def to_dict(self) -> dict:
        """
        Returns the entity as a dictionary storing the value of the properties assigned to the entity.

        Returns:
            dict: The dictionary representation of the entity.
        """
        dump_json = self.to_json()
        return json.loads(dump_json)

    def to_hdf5(self, hdf_file: h5py.File, group_name: str = "") -> h5py.File:
        """
        Serialize the entity to a HDF5 file under the group specified in the input.

        Args:
            hdf_file (h5py.File): The HDF5 file to store the entity.
            group_name (str, optional): The group name to serialize the data.
        """
        if not group_name:
            group_name = self.cls_name
        group = hdf_file.create_group(group_name)

        for key in self._property_metadata.keys():
            try:
                value = getattr(self, key)
                if not value:
                    continue
                if isinstance(value, str | int | float | bool | list | tuple):
                    group.create_dataset(key, data=value)
                else:
                    raise TypeError(
                        f"Unsupported type {type(value)} for key {key} for HDF5 serialization."
                    )
            except AttributeError:
                continue

    def model_to_dict(self) -> dict:
        """
        Returns the model as a dictionary storing the data `defs` and the property or vocabulary term
        assignments.

        Returns:
            dict: The dictionary representation of the model.
        """
        data = self.model_dump()

        attr_value = getattr(self, "defs")
        if isinstance(attr_value, BaseModel):
            data["defs"] = attr_value.model_dump()
        else:
            data["defs"] = attr_value
        return data

    def model_to_json(self, indent: int | None = None) -> str:
        """
        Returns the model as a string in JSON format storing the data `defs` and the property or
        vocabulary term assignments.

        Args:
            indent (Optional[int], optional): The indent to print in JSON. Defaults to None.

        Returns:
            str: The JSON representation of the model.
        """
        # * `model_dump_json()` from pydantic does not store the `defs` section of each entity.
        data = self.model_to_dict()
        return json.dumps(data, indent=indent)

    def _add_properties_rdf(
        self,
        namespace: "Namespace",
        graph: "Graph",
        prop: "PropertyTypeAssignment",
        logger: "BoundLoggerLazyProxy",
    ) -> "URIRef":
        """
        Add the properties assigned to the entity to the RDF graph extracting the information from
        OpenBIS for the `object_code` or `vocabulary_code`.

        Args:
            namespace (Namespace): The namespace to use for the RDF graph.
            graph (Graph): The RDF graph to which the properties are added.
            prop (PropertyTypeAssignment): The property assigned to the entity.
            logger (BoundLoggerLazyProxy): The logger to log messages.

        Returns:
            URIRef: The URI reference of the property added to the RDF graph.
        """
        prop_uri = namespace[prop.id]

        # Define the property as an OWL class inheriting from PropertyType
        graph.add((prop_uri, RDF.type, OWL.Thing))
        graph.add((prop_uri, RDFS.subClassOf, namespace.PropertyType))

        # Add attributes like id, code, description in English and Deutsch, property_label, data_type
        graph.add((prop_uri, RDFS.label, Literal(prop.id, lang="en")))
        graph.add((prop_uri, DC.identifier, Literal(prop.code)))
        descriptions = prop.description.split("//")
        if len(descriptions) > 1:
            graph.add((prop_uri, RDFS.comment, Literal(descriptions[0], lang="en")))
            graph.add((prop_uri, RDFS.comment, Literal(descriptions[1], lang="de")))
        else:
            graph.add((prop_uri, RDFS.comment, Literal(prop.description, lang="en")))
        graph.add(
            (prop_uri, namespace.propertyLabel, Literal(prop.property_label, lang="en"))
        )
        graph.add((prop_uri, namespace.dataType, Literal(prop.data_type.value)))
        if prop.data_type.value == "OBJECT":
            # entity_ref_uri = BAM[code_to_class_name(obj.object_code)]
            # graph.add((prop_uri, BAM.referenceTo, entity_ref_uri))
            object_code = code_to_class_name(prop.object_code, logger)
            if not object_code:
                logger.error(
                    f"Failed to identify the `object_code` for the property {prop.id}"
                )
                return prop_uri
            entity_ref_uri = namespace[object_code]

            # Create a restriction with referenceTo
            restriction = BNode()
            graph.add((restriction, RDF.type, OWL.Restriction))
            graph.add((restriction, OWL.onProperty, namespace["referenceTo"]))
            graph.add((restriction, OWL.someValuesFrom, entity_ref_uri))

            # Add the restriction as a subclass of the property
            graph.add((prop_uri, RDFS.subClassOf, restriction))
        return prop_uri

    # skos:prefLabel used for class names
    # skos:definition used for `description` (en, de)
    # dc:identifier used for `code`  # ! only defined for internal codes with $ symbol
    # parents defined from `code`
    # assigned properties can be Mandatory or Optional, can be PropertyType or ObjectType
    # ? For OBJECT TYPES
    # ? `generated_code_prefix`, `auto_generate_codes`?
    @no_type_check
    def model_to_rdf(
        self, namespace: "Namespace", graph: "Graph", logger: "BoundLoggerLazyProxy"
    ) -> None:
        """
        Convert the entity to RDF triples and add them to the graph. The function uses the
        `_add_properties_rdf` method to convert the properties assigned to the entity to RDF triples.

        Args:
            namespace (Namespace): The namespace to use for the RDF graph.
            graph (Graph): The RDF graph to which the entity is added.
            logger (BoundLoggerLazyProxy): The logger to log messages.
        """
        entity_uri = namespace[self.defs.id]

        # Define the entity as an OWL class inheriting from the specific namespace type
        graph.add((entity_uri, RDF.type, OWL.Thing))
        parent_classes = self.__class__.__bases__
        for parent_class in parent_classes:
            if issubclass(parent_class, BaseEntity) and parent_class != BaseEntity:
                # if parent_class.__name__ in [
                #     "ObjectType",
                #     "CollectionType",
                #     "DatasetType",
                # ]:
                #     # ! add here logic of subClassOf connecting with PROV-O or BFO
                #     # ! maybe via classes instead of ObjectType/CollectionType/DatasetType?
                #     # ! Example:
                #     # !     graph.add((entity_uri, RDFS.subClassOf, "http://www.w3.org/ns/prov#Entity"))
                #     continue
                parent_uri = namespace[parent_class.__name__]
                graph.add((entity_uri, RDFS.subClassOf, parent_uri))

        # Add attributes like id, code, description in English and Deutsch, property_label, data_type
        graph.add((entity_uri, RDFS.label, Literal(self.defs.id, lang="en")))
        graph.add((entity_uri, DC.identifier, Literal(self.defs.code)))
        descriptions = self.defs.description.split("//")
        if len(descriptions) > 1:
            graph.add((entity_uri, RDFS.comment, Literal(descriptions[0], lang="en")))
            graph.add((entity_uri, RDFS.comment, Literal(descriptions[1], lang="de")))
        else:
            graph.add(
                (entity_uri, RDFS.comment, Literal(self.defs.description, lang="en"))
            )
        # Adding properties relationships to the entities
        for assigned_prop in self._base_attrs:
            prop_uri = self._add_properties_rdf(namespace, graph, assigned_prop, logger)
            restriction = BNode()
            graph.add((restriction, RDF.type, OWL.Restriction))
            if assigned_prop.mandatory:
                graph.add(
                    (restriction, OWL.onProperty, namespace["hasMandatoryProperty"])
                )
            else:
                graph.add(
                    (restriction, OWL.onProperty, namespace["hasOptionalProperty"])
                )
            graph.add((restriction, OWL.someValuesFrom, prop_uri))

            # Add the restriction as a subclass of the entity
            graph.add((entity_uri, RDFS.subClassOf, restriction))

__str__ = __repr__

cls_name

Returns the entity name of the class as a string to speed up checks. This is a property to be overwritten by each of the abstract entity types.

__init__(**kwargs)

Source code in bam_masterdata/metadata/entities.py
def __init__(self, **kwargs):
    super().__init__()

    # We store the `_property_metadata` during instantiation of the class
    self._property_metadata = self.get_property_metadata()

    for key, value in kwargs.items():
        setattr(self, key, value)

__setattr__(key, value)

Source code in bam_masterdata/metadata/entities.py
def __setattr__(self, key, value):
    if key == "_property_metadata":
        super().__setattr__(key, value)
        return

    if key in self._property_metadata:
        # TODO add CONTROLLEDVOCABULARY and OBJECT cases
        expected_type = self._property_metadata[key].data_type.pytype
        if expected_type and not isinstance(value, expected_type):
            raise TypeError(
                f"Invalid type for '{key}': Expected {expected_type.__name__}, got {type(value).__name__}"
            )

    # TODO add check if someone tries to set up a definition instead of an assigned property

    object.__setattr__(self, key, value)

__repr__()

Source code in bam_masterdata/metadata/entities.py
def __repr__(self):
    # Filter for attributes that are `PropertyTypeAssignment` and set to a finite value
    class_prop_name = None
    fields = []
    for key, metadata in self._property_metadata.items():
        if isinstance(metadata, PropertyTypeAssignment):
            value = getattr(self, key, None)
            # Only include set attributes
            if value is not None and not isinstance(value, PropertyTypeAssignment):
                if key == "name":
                    class_prop_name = value
                fields.append(f"{key}={repr(value)}")

    # Format the output
    class_name = self.cls_name
    if class_prop_name:  # adding `name` if available
        class_name = f"{class_prop_name}:{class_name}"
    return f"{class_name}({', '.join(fields)})"

get_property_metadata()

Dictionary containing the metadata of the properties assigned to the entity type.

RETURNS DESCRIPTION
dict

A dictionary containing the keys of the PropertyTypeAssignment attribute names and the

TYPE: dict

dict

values of the definitions of PropertyTypeAssignment. Example:

dict

{ "name": PropertyTypeAssignment( code="$NAME", data_type=VARCHAR, mandatory=True, property_label="Name" ), "age": PropertyTypeAssignment( code="AGE", data_type=INTEGER, mandatory=False, property_label="Age" ),

dict

}

Source code in bam_masterdata/metadata/entities.py
def get_property_metadata(self) -> dict:
    """
    Dictionary containing the metadata of the properties assigned to the entity type.

    Returns:
        dict: A dictionary containing the keys of the `PropertyTypeAssignment` attribute names and the
        values of the definitions of `PropertyTypeAssignment`. Example:
        {
            "name": PropertyTypeAssignment(
                code="$NAME",
                data_type=VARCHAR,
                mandatory=True,
                property_label="Name"
            ),
            "age": PropertyTypeAssignment(
                code="AGE",
                data_type=INTEGER,
                mandatory=False,
                property_label="Age"
            ),
        }
    """
    cls_attrs = self.__class__.__dict__

    # Store property metadata at class level
    prop_meta_dict: dict = {}
    for base in type(self).__mro__:
        cls_attrs = getattr(base, "__dict__", {})
        for attr_name, attr_value in cls_attrs.items():
            if isinstance(attr_value, PropertyTypeAssignment):
                prop_meta_dict[attr_name] = attr_value
    return prop_meta_dict

to_json(indent=None)

Returns the entity as a string in JSON format storing the value of the properties assigned to the entity.

PARAMETER DESCRIPTION
indent

The indent to print in JSON. Defaults to None.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
str

The JSON representation of the entity.

TYPE: str

Source code in bam_masterdata/metadata/entities.py
def to_json(self, indent: int | None = None) -> str:
    """
    Returns the entity as a string in JSON format storing the value of the properties
    assigned to the entity.

    Args:
        indent (Optional[int], optional): The indent to print in JSON. Defaults to None.

    Returns:
        str: The JSON representation of the entity.
    """
    data: dict = {}
    for key in self._property_metadata.keys():
        try:
            value = getattr(self, key)
        except AttributeError:
            continue
        if isinstance(value, PropertyTypeAssignment):
            continue
        data[key] = value
    return json.dumps(data, indent=indent)

to_dict()

Returns the entity as a dictionary storing the value of the properties assigned to the entity.

RETURNS DESCRIPTION
dict

The dictionary representation of the entity.

TYPE: dict

Source code in bam_masterdata/metadata/entities.py
def to_dict(self) -> dict:
    """
    Returns the entity as a dictionary storing the value of the properties assigned to the entity.

    Returns:
        dict: The dictionary representation of the entity.
    """
    dump_json = self.to_json()
    return json.loads(dump_json)

to_hdf5(hdf_file, group_name='')

Serialize the entity to a HDF5 file under the group specified in the input.

PARAMETER DESCRIPTION
hdf_file

The HDF5 file to store the entity.

TYPE: File

group_name

The group name to serialize the data.

TYPE: str DEFAULT: ''

Source code in bam_masterdata/metadata/entities.py
def to_hdf5(self, hdf_file: h5py.File, group_name: str = "") -> h5py.File:
    """
    Serialize the entity to a HDF5 file under the group specified in the input.

    Args:
        hdf_file (h5py.File): The HDF5 file to store the entity.
        group_name (str, optional): The group name to serialize the data.
    """
    if not group_name:
        group_name = self.cls_name
    group = hdf_file.create_group(group_name)

    for key in self._property_metadata.keys():
        try:
            value = getattr(self, key)
            if not value:
                continue
            if isinstance(value, str | int | float | bool | list | tuple):
                group.create_dataset(key, data=value)
            else:
                raise TypeError(
                    f"Unsupported type {type(value)} for key {key} for HDF5 serialization."
                )
        except AttributeError:
            continue

model_to_dict()

Returns the model as a dictionary storing the data defs and the property or vocabulary term assignments.

RETURNS DESCRIPTION
dict

The dictionary representation of the model.

TYPE: dict

Source code in bam_masterdata/metadata/entities.py
def model_to_dict(self) -> dict:
    """
    Returns the model as a dictionary storing the data `defs` and the property or vocabulary term
    assignments.

    Returns:
        dict: The dictionary representation of the model.
    """
    data = self.model_dump()

    attr_value = getattr(self, "defs")
    if isinstance(attr_value, BaseModel):
        data["defs"] = attr_value.model_dump()
    else:
        data["defs"] = attr_value
    return data

model_to_json(indent=None)

Returns the model as a string in JSON format storing the data defs and the property or vocabulary term assignments.

PARAMETER DESCRIPTION
indent

The indent to print in JSON. Defaults to None.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
str

The JSON representation of the model.

TYPE: str

Source code in bam_masterdata/metadata/entities.py
def model_to_json(self, indent: int | None = None) -> str:
    """
    Returns the model as a string in JSON format storing the data `defs` and the property or
    vocabulary term assignments.

    Args:
        indent (Optional[int], optional): The indent to print in JSON. Defaults to None.

    Returns:
        str: The JSON representation of the model.
    """
    # * `model_dump_json()` from pydantic does not store the `defs` section of each entity.
    data = self.model_to_dict()
    return json.dumps(data, indent=indent)

model_to_rdf(namespace, graph, logger)

Convert the entity to RDF triples and add them to the graph. The function uses the _add_properties_rdf method to convert the properties assigned to the entity to RDF triples.

PARAMETER DESCRIPTION
namespace

The namespace to use for the RDF graph.

TYPE: Namespace

graph

The RDF graph to which the entity is added.

TYPE: Graph

logger

The logger to log messages.

TYPE: BoundLoggerLazyProxy

Source code in bam_masterdata/metadata/entities.py
@no_type_check
def model_to_rdf(
    self, namespace: "Namespace", graph: "Graph", logger: "BoundLoggerLazyProxy"
) -> None:
    """
    Convert the entity to RDF triples and add them to the graph. The function uses the
    `_add_properties_rdf` method to convert the properties assigned to the entity to RDF triples.

    Args:
        namespace (Namespace): The namespace to use for the RDF graph.
        graph (Graph): The RDF graph to which the entity is added.
        logger (BoundLoggerLazyProxy): The logger to log messages.
    """
    entity_uri = namespace[self.defs.id]

    # Define the entity as an OWL class inheriting from the specific namespace type
    graph.add((entity_uri, RDF.type, OWL.Thing))
    parent_classes = self.__class__.__bases__
    for parent_class in parent_classes:
        if issubclass(parent_class, BaseEntity) and parent_class != BaseEntity:
            # if parent_class.__name__ in [
            #     "ObjectType",
            #     "CollectionType",
            #     "DatasetType",
            # ]:
            #     # ! add here logic of subClassOf connecting with PROV-O or BFO
            #     # ! maybe via classes instead of ObjectType/CollectionType/DatasetType?
            #     # ! Example:
            #     # !     graph.add((entity_uri, RDFS.subClassOf, "http://www.w3.org/ns/prov#Entity"))
            #     continue
            parent_uri = namespace[parent_class.__name__]
            graph.add((entity_uri, RDFS.subClassOf, parent_uri))

    # Add attributes like id, code, description in English and Deutsch, property_label, data_type
    graph.add((entity_uri, RDFS.label, Literal(self.defs.id, lang="en")))
    graph.add((entity_uri, DC.identifier, Literal(self.defs.code)))
    descriptions = self.defs.description.split("//")
    if len(descriptions) > 1:
        graph.add((entity_uri, RDFS.comment, Literal(descriptions[0], lang="en")))
        graph.add((entity_uri, RDFS.comment, Literal(descriptions[1], lang="de")))
    else:
        graph.add(
            (entity_uri, RDFS.comment, Literal(self.defs.description, lang="en"))
        )
    # Adding properties relationships to the entities
    for assigned_prop in self._base_attrs:
        prop_uri = self._add_properties_rdf(namespace, graph, assigned_prop, logger)
        restriction = BNode()
        graph.add((restriction, RDF.type, OWL.Restriction))
        if assigned_prop.mandatory:
            graph.add(
                (restriction, OWL.onProperty, namespace["hasMandatoryProperty"])
            )
        else:
            graph.add(
                (restriction, OWL.onProperty, namespace["hasOptionalProperty"])
            )
        graph.add((restriction, OWL.someValuesFrom, prop_uri))

        # Add the restriction as a subclass of the entity
        graph.add((entity_uri, RDFS.subClassOf, restriction))

ObjectType

Bases: BaseEntity

Base class used to define object types. All object types must inherit from this class. The object types are defined in the module bam_masterdata/object_types.py.

The ObjectType class contains a list of all properties defined for a ObjectType, for internally represent the model in other formats (e.g., JSON or Excel).

Note this is also used for CollectionType and DatasetType, as they also contain a list of properties.

Source code in bam_masterdata/metadata/entities.py
class ObjectType(BaseEntity):
    """
    Base class used to define object types. All object types must inherit from this class. The
    object types are defined in the module `bam_masterdata/object_types.py`.

    The `ObjectType` class contains a list of all `properties` defined for a `ObjectType`, for
    internally represent the model in other formats (e.g., JSON or Excel).

    Note this is also used for `CollectionType` and `DatasetType`, as they also contain a list of
    properties.
    """

    model_config = ConfigDict(
        ignored_types=(
            ObjectTypeDef,
            CollectionTypeDef,
            DatasetTypeDef,
            PropertyTypeAssignment,
        )
    )

    properties: list[PropertyTypeAssignment] = Field(
        default=[],
        description="""
        List of properties assigned to an object type. This is useful for internal representation of the model.
        """,
    )

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        # Initialize the properties list to store PropertyTypeAssignment instances
        self._properties = {}
        for key, prop in self._property_metadata.items():
            self._properties[key] = prop.data_type

    def __setattr__(self, key, value):
        if key in ["_property_metadata", "_properties"]:
            super().__setattr__(key, value)
            return

        # key search in every nested class
        for base in type(self).__mro__:
            prop_meta = getattr(base, "get_property_metadata", None)
            if callable(prop_meta):
                meta = (
                    prop_meta(self)
                    if base is not type(self)
                    else self._property_metadata
                )
                if key in meta:
                    # Typcheck
                    expected_type = meta[key].data_type.pytype
                    if expected_type is datetime.datetime:
                        if isinstance(value, datetime.datetime):
                            try:
                                value = value.strftime(
                                    "%Y-%m-%d %H:%M:%S"
                                )  # create string
                                expected_type = str
                            except ValueError:
                                raise ValueError(
                                    f"Invalid datetime format for '{key}': Expected ISO format string, got '{value}'"
                                )
                        elif isinstance(value, str):
                            try:
                                datetime.datetime.fromisoformat(value)
                                expected_type = str
                            except ValueError:
                                raise ValueError(
                                    f"Invalid datetime format for '{key}': Expected ISO format string, got '{value}'"
                                )
                        else:
                            raise TypeError(
                                f"Invalid type for '{key}': Expected datetime or ISO format string, got {type(value).__name__}"
                            )
                    if expected_type and not isinstance(value, expected_type):
                        raise TypeError(
                            f"Invalid type for '{key}': Expected {expected_type.__name__}, got {type(value).__name__}"
                        )
                    #  CONTROLLEDVOCABULARY-Check
                    data_type = meta[key].data_type
                    if data_type == "CONTROLLEDVOCABULARY":
                        vocabulary_code = meta[key].vocabulary_code
                        if not vocabulary_code:
                            raise ValueError(
                                f"Property '{key}' of type CONTROLLEDVOCABULARY must have a vocabulary_code defined."
                            )
                        vocab_path = None
                        for file in listdir_py_modules(DATAMODEL_DIR):
                            if "vocabulary_types.py" in file:
                                vocab_path = file
                                break
                        if vocab_path is None:
                            raise FileNotFoundError(
                                f"The file 'vocabulary_types.py' was not found in the directory specified by {DATAMODEL_DIR}."
                            )
                        vocabulary_class = self.get_vocabulary_class(
                            vocabulary_code, vocab_path
                        )
                        if vocabulary_class is None:
                            raise ValueError(
                                f"No matching vocabulary class found for vocabulary_code '{vocabulary_code}'."
                            )
                        codes = [term.code for term in vocabulary_class.terms]
                        if value not in codes:
                            raise ValueError(
                                f"{value} for {key} is not in the list of allowed terms for vocabulary."
                            )
                    # set attribute
                    return object.__setattr__(self, key, value)

        raise KeyError(
            f"Key '{key}' not found in any property_metadata of {type(self).__name__} or its bases."
        )

    def get_vocabulary_class(
        self, vocabulary_code: str, vocab_path: str
    ) -> VocabularyType | None:
        """
        Get the class instance of the vocabulary type defined by `vocabulary_code` in the Python module
        specified by `vocab_path`.

        Args:
            vocabulary_code (str): Code of the vocabulary type to get.
            vocab_path (str): Path to the module containing the vocabulary type definitions.

        Returns:
            VocabularyType | None: The class of the vocabulary type if found, otherwise None.
        """
        module = import_module(vocab_path)
        vocabulary_class = None
        for name, obj in inspect.getmembers(module, inspect.isclass):
            if name == code_to_class_name(vocabulary_code):
                vocabulary_class = obj()
                break

        return vocabulary_class

    @property
    def base_name(self) -> str:
        """
        Returns the entity name of the class as a string.
        """
        return "ObjectType"

    @model_validator(mode="after")
    @classmethod
    def model_validator_after_init(cls, data: Any) -> Any:
        """
        Validate the model after instantiation of the class.

        Args:
            data (Any): The data containing the fields values to validate.

        Returns:
            Any: The data with the validated fields.
        """
        # Add all the properties assigned to the object type to the `properties` list.
        # TODO check if the order is properly assigned
        for base in cls.__mro__:
            for _, attr_val in base.__dict__.items():
                if isinstance(attr_val, PropertyTypeAssignment):
                    data.properties.append(attr_val)

        return data

    def to_openbis(
        self,
        logger: "BoundLoggerLazyProxy",
        openbis: "Openbis",
        type: str = "object",
        type_map: dict = OBJECT_TYPE_MAP,
    ) -> None:
        def get_type(openbis: "Openbis", code: str):
            return openbis.get_object_type(code)

        def create_type(openbis: "Openbis", defs: ObjectTypeDef):
            return openbis.new_object_type(
                code=defs.code,
                description=defs.description,
                validationPlugin=defs.validation_script,
                generatedCodePrefix=defs.generated_code_prefix,
                autoGeneratedCode=defs.auto_generate_codes,
            )

        super()._to_openbis(
            logger=logger,
            openbis=openbis,
            type=type,
            type_map=type_map,
            get_type=get_type,
            create_type=create_type,
        )

model_config = ConfigDict(ignored_types=(ObjectTypeDef, CollectionTypeDef, DatasetTypeDef, PropertyTypeAssignment))

properties = Field(default=[], description='\n List of properties assigned to an object type. This is useful for internal representation of the model.\n ')

base_name

Returns the entity name of the class as a string.

__init__(**kwargs)

Source code in bam_masterdata/metadata/entities.py
def __init__(self, **kwargs):
    super().__init__(**kwargs)

    # Initialize the properties list to store PropertyTypeAssignment instances
    self._properties = {}
    for key, prop in self._property_metadata.items():
        self._properties[key] = prop.data_type

__setattr__(key, value)

Source code in bam_masterdata/metadata/entities.py
def __setattr__(self, key, value):
    if key in ["_property_metadata", "_properties"]:
        super().__setattr__(key, value)
        return

    # key search in every nested class
    for base in type(self).__mro__:
        prop_meta = getattr(base, "get_property_metadata", None)
        if callable(prop_meta):
            meta = (
                prop_meta(self)
                if base is not type(self)
                else self._property_metadata
            )
            if key in meta:
                # Typcheck
                expected_type = meta[key].data_type.pytype
                if expected_type is datetime.datetime:
                    if isinstance(value, datetime.datetime):
                        try:
                            value = value.strftime(
                                "%Y-%m-%d %H:%M:%S"
                            )  # create string
                            expected_type = str
                        except ValueError:
                            raise ValueError(
                                f"Invalid datetime format for '{key}': Expected ISO format string, got '{value}'"
                            )
                    elif isinstance(value, str):
                        try:
                            datetime.datetime.fromisoformat(value)
                            expected_type = str
                        except ValueError:
                            raise ValueError(
                                f"Invalid datetime format for '{key}': Expected ISO format string, got '{value}'"
                            )
                    else:
                        raise TypeError(
                            f"Invalid type for '{key}': Expected datetime or ISO format string, got {type(value).__name__}"
                        )
                if expected_type and not isinstance(value, expected_type):
                    raise TypeError(
                        f"Invalid type for '{key}': Expected {expected_type.__name__}, got {type(value).__name__}"
                    )
                #  CONTROLLEDVOCABULARY-Check
                data_type = meta[key].data_type
                if data_type == "CONTROLLEDVOCABULARY":
                    vocabulary_code = meta[key].vocabulary_code
                    if not vocabulary_code:
                        raise ValueError(
                            f"Property '{key}' of type CONTROLLEDVOCABULARY must have a vocabulary_code defined."
                        )
                    vocab_path = None
                    for file in listdir_py_modules(DATAMODEL_DIR):
                        if "vocabulary_types.py" in file:
                            vocab_path = file
                            break
                    if vocab_path is None:
                        raise FileNotFoundError(
                            f"The file 'vocabulary_types.py' was not found in the directory specified by {DATAMODEL_DIR}."
                        )
                    vocabulary_class = self.get_vocabulary_class(
                        vocabulary_code, vocab_path
                    )
                    if vocabulary_class is None:
                        raise ValueError(
                            f"No matching vocabulary class found for vocabulary_code '{vocabulary_code}'."
                        )
                    codes = [term.code for term in vocabulary_class.terms]
                    if value not in codes:
                        raise ValueError(
                            f"{value} for {key} is not in the list of allowed terms for vocabulary."
                        )
                # set attribute
                return object.__setattr__(self, key, value)

    raise KeyError(
        f"Key '{key}' not found in any property_metadata of {type(self).__name__} or its bases."
    )

get_vocabulary_class(vocabulary_code, vocab_path)

Get the class instance of the vocabulary type defined by vocabulary_code in the Python module specified by vocab_path.

PARAMETER DESCRIPTION
vocabulary_code

Code of the vocabulary type to get.

TYPE: str

vocab_path

Path to the module containing the vocabulary type definitions.

TYPE: str

RETURNS DESCRIPTION
VocabularyType | None

VocabularyType | None: The class of the vocabulary type if found, otherwise None.

Source code in bam_masterdata/metadata/entities.py
def get_vocabulary_class(
    self, vocabulary_code: str, vocab_path: str
) -> VocabularyType | None:
    """
    Get the class instance of the vocabulary type defined by `vocabulary_code` in the Python module
    specified by `vocab_path`.

    Args:
        vocabulary_code (str): Code of the vocabulary type to get.
        vocab_path (str): Path to the module containing the vocabulary type definitions.

    Returns:
        VocabularyType | None: The class of the vocabulary type if found, otherwise None.
    """
    module = import_module(vocab_path)
    vocabulary_class = None
    for name, obj in inspect.getmembers(module, inspect.isclass):
        if name == code_to_class_name(vocabulary_code):
            vocabulary_class = obj()
            break

    return vocabulary_class

model_validator_after_init(data)

Validate the model after instantiation of the class.

PARAMETER DESCRIPTION
data

The data containing the fields values to validate.

TYPE: Any

RETURNS DESCRIPTION
Any

The data with the validated fields.

TYPE: Any

Source code in bam_masterdata/metadata/entities.py
@model_validator(mode="after")
@classmethod
def model_validator_after_init(cls, data: Any) -> Any:
    """
    Validate the model after instantiation of the class.

    Args:
        data (Any): The data containing the fields values to validate.

    Returns:
        Any: The data with the validated fields.
    """
    # Add all the properties assigned to the object type to the `properties` list.
    # TODO check if the order is properly assigned
    for base in cls.__mro__:
        for _, attr_val in base.__dict__.items():
            if isinstance(attr_val, PropertyTypeAssignment):
                data.properties.append(attr_val)

    return data

to_openbis(logger, openbis, type='object', type_map=OBJECT_TYPE_MAP)

Source code in bam_masterdata/metadata/entities.py
def to_openbis(
    self,
    logger: "BoundLoggerLazyProxy",
    openbis: "Openbis",
    type: str = "object",
    type_map: dict = OBJECT_TYPE_MAP,
) -> None:
    def get_type(openbis: "Openbis", code: str):
        return openbis.get_object_type(code)

    def create_type(openbis: "Openbis", defs: ObjectTypeDef):
        return openbis.new_object_type(
            code=defs.code,
            description=defs.description,
            validationPlugin=defs.validation_script,
            generatedCodePrefix=defs.generated_code_prefix,
            autoGeneratedCode=defs.auto_generate_codes,
        )

    super()._to_openbis(
        logger=logger,
        openbis=openbis,
        type=type,
        type_map=type_map,
        get_type=get_type,
        create_type=create_type,
    )

CollectionType

Bases: ObjectType

Source code in bam_masterdata/metadata/entities.py
class CollectionType(ObjectType):
    model_config = ConfigDict(
        ignored_types=(
            ObjectTypeDef,
            ObjectType,
            CollectionTypeDef,
            PropertyTypeAssignment,
        )
    )

    attached_objects: dict[str, ObjectType] = Field(
        default={},
        exclude=True,
        description="""
        Dictionary containing the object types attached to the collection type.
        The keys are object unique identifiers and the values are the ObjectType instances.
        """,
    )

    relationships: dict[str, tuple[str, str]] = Field(
        default={},
        exclude=True,
        description="""
        Dictionary containing the relationships between the objects attached to the collection type.
        The keys are relationships unique identifiers, the values are the object unique identifiers as a
        tuple, and the order is always (parent_id, child_id).
        """,
    )

    def __repr__(self):
        return f"{self.base_name}(attached_objects={self.attached_objects}, relationships={self.relationships})"

    @property
    def base_name(self) -> str:
        """
        Returns the entity name of the class as a string.
        """
        return "CollectionType"

    def to_openbis(
        self,
        logger: "BoundLoggerLazyProxy",
        openbis: "Openbis",
        type: str = "collection",
        type_map: dict = COLLECTION_TYPE_MAP,
    ) -> None:
        def get_type(openbis: "Openbis", code: str):
            return openbis.get_collection_type(code)

        def create_type(openbis: "Openbis", defs: CollectionTypeDef):
            if defs.validation_script == "None":
                defs.validation_script = None
            if defs.validation_script:
                return openbis.new_collection_type(
                    code=defs.code,
                    description=defs.description,
                    validationPlugin=defs.validation_script,
                )
            else:
                return openbis.new_collection_type(
                    code=defs.code,
                    description=defs.description,
                    validationPlugin="",
                )

        super()._to_openbis(
            logger=logger,
            openbis=openbis,
            type=type,
            type_map=type_map,
            get_type=get_type,
            create_type=create_type,
        )

    def add(self, object_type: ObjectType) -> str:
        """
        Add an object type to the collection type.

        Args:
            object_type (ObjectType): The object type to add to the collection type.

        Returns:
            str: The unique identifier of the object type assigned in openBIS.
        """
        if not isinstance(object_type, ObjectType):
            raise TypeError(
                f"Expected an ObjectType instance, got `{type(object_type).__name__}`"
            )

        # Check mandatory properties are filled
        missing_fields = []
        for attr_name, prop in object_type._property_metadata.items():
            assigned_prop = getattr(object_type, attr_name, None)
            if prop.mandatory and isinstance(assigned_prop, PropertyTypeAssignment):
                missing_fields.append(attr_name)

        if missing_fields:
            raise ValueError(
                f"The following mandatory fields are missing for ObjectType '{object_type.cls_name}': {', '.join(missing_fields)}"
            )

        object_id = generate_object_id(object_type)
        self.attached_objects[object_id] = object_type
        return object_id

    def remove(self, object_id: str = "") -> None:
        """
        Remove an object type from the collection type by its unique identifier.

        Args:
            object_id (str, optional): The ID of the object type to be removed from the collection.
        """
        if not object_id:
            raise ValueError(
                "You must provide an `object_id` to remove the object type from the collection."
            )
        if object_id not in self.attached_objects.keys():
            raise ValueError(
                f"Object with ID '{object_id}' does not exist in the collection."
            )
        del self.attached_objects[object_id]

    def add_relationship(self, parent_id: str, child_id: str) -> str:
        """
        Add a relationship between two object types in the collection type.

        Args:
            parent_id (str): The unique identifier of the parent object type.
            child_id (str): The unique identifier of the child object type.

        Returns:
            str: The unique identifier of the relationship created, which is a concatenation of the parent
            and child IDs.
        """
        if not parent_id or not child_id:
            raise ValueError(
                "Both `parent_id` and `child_id` must be provided to add a relationship."
            )
        if (
            parent_id not in self.attached_objects.keys()
            or child_id not in self.attached_objects.keys()
        ):
            raise ValueError(
                "Both `parent_id` and `child_id` must be assigned to objects attached to the collection."
            )
        relationship_id = generate_object_relationship_id(parent_id, child_id)
        self.relationships[relationship_id] = (parent_id, child_id)
        return relationship_id

    def remove_relationship(self, relationship_id: str) -> None:
        """
        Remove a relationship from the collection type.

        Args:
            relationship_id (str): The unique identifier of the relationship to remove.
        """
        if not relationship_id:
            raise ValueError(
                "You must provide a `relationship_id` to remove the relationship from the collection type."
            )
        if relationship_id not in self.relationships.keys():
            raise ValueError(
                f"Relationship with ID '{relationship_id}' does not exist in the collection type."
            )
        del self.relationships[relationship_id]

model_config = ConfigDict(ignored_types=(ObjectTypeDef, ObjectType, CollectionTypeDef, PropertyTypeAssignment))

attached_objects = Field(default={}, exclude=True, description='\n Dictionary containing the object types attached to the collection type.\n The keys are object unique identifiers and the values are the ObjectType instances.\n ')

relationships = Field(default={}, exclude=True, description='\n Dictionary containing the relationships between the objects attached to the collection type.\n The keys are relationships unique identifiers, the values are the object unique identifiers as a\n tuple, and the order is always (parent_id, child_id).\n ')

base_name

Returns the entity name of the class as a string.

__repr__()

Source code in bam_masterdata/metadata/entities.py
def __repr__(self):
    return f"{self.base_name}(attached_objects={self.attached_objects}, relationships={self.relationships})"

to_openbis(logger, openbis, type='collection', type_map=COLLECTION_TYPE_MAP)

Source code in bam_masterdata/metadata/entities.py
def to_openbis(
    self,
    logger: "BoundLoggerLazyProxy",
    openbis: "Openbis",
    type: str = "collection",
    type_map: dict = COLLECTION_TYPE_MAP,
) -> None:
    def get_type(openbis: "Openbis", code: str):
        return openbis.get_collection_type(code)

    def create_type(openbis: "Openbis", defs: CollectionTypeDef):
        if defs.validation_script == "None":
            defs.validation_script = None
        if defs.validation_script:
            return openbis.new_collection_type(
                code=defs.code,
                description=defs.description,
                validationPlugin=defs.validation_script,
            )
        else:
            return openbis.new_collection_type(
                code=defs.code,
                description=defs.description,
                validationPlugin="",
            )

    super()._to_openbis(
        logger=logger,
        openbis=openbis,
        type=type,
        type_map=type_map,
        get_type=get_type,
        create_type=create_type,
    )

add(object_type)

Add an object type to the collection type.

PARAMETER DESCRIPTION
object_type

The object type to add to the collection type.

TYPE: ObjectType

RETURNS DESCRIPTION
str

The unique identifier of the object type assigned in openBIS.

TYPE: str

Source code in bam_masterdata/metadata/entities.py
def add(self, object_type: ObjectType) -> str:
    """
    Add an object type to the collection type.

    Args:
        object_type (ObjectType): The object type to add to the collection type.

    Returns:
        str: The unique identifier of the object type assigned in openBIS.
    """
    if not isinstance(object_type, ObjectType):
        raise TypeError(
            f"Expected an ObjectType instance, got `{type(object_type).__name__}`"
        )

    # Check mandatory properties are filled
    missing_fields = []
    for attr_name, prop in object_type._property_metadata.items():
        assigned_prop = getattr(object_type, attr_name, None)
        if prop.mandatory and isinstance(assigned_prop, PropertyTypeAssignment):
            missing_fields.append(attr_name)

    if missing_fields:
        raise ValueError(
            f"The following mandatory fields are missing for ObjectType '{object_type.cls_name}': {', '.join(missing_fields)}"
        )

    object_id = generate_object_id(object_type)
    self.attached_objects[object_id] = object_type
    return object_id

remove(object_id='')

Remove an object type from the collection type by its unique identifier.

PARAMETER DESCRIPTION
object_id

The ID of the object type to be removed from the collection.

TYPE: str DEFAULT: ''

Source code in bam_masterdata/metadata/entities.py
def remove(self, object_id: str = "") -> None:
    """
    Remove an object type from the collection type by its unique identifier.

    Args:
        object_id (str, optional): The ID of the object type to be removed from the collection.
    """
    if not object_id:
        raise ValueError(
            "You must provide an `object_id` to remove the object type from the collection."
        )
    if object_id not in self.attached_objects.keys():
        raise ValueError(
            f"Object with ID '{object_id}' does not exist in the collection."
        )
    del self.attached_objects[object_id]

add_relationship(parent_id, child_id)

Add a relationship between two object types in the collection type.

PARAMETER DESCRIPTION
parent_id

The unique identifier of the parent object type.

TYPE: str

child_id

The unique identifier of the child object type.

TYPE: str

RETURNS DESCRIPTION
str

The unique identifier of the relationship created, which is a concatenation of the parent

TYPE: str

str

and child IDs.

Source code in bam_masterdata/metadata/entities.py
def add_relationship(self, parent_id: str, child_id: str) -> str:
    """
    Add a relationship between two object types in the collection type.

    Args:
        parent_id (str): The unique identifier of the parent object type.
        child_id (str): The unique identifier of the child object type.

    Returns:
        str: The unique identifier of the relationship created, which is a concatenation of the parent
        and child IDs.
    """
    if not parent_id or not child_id:
        raise ValueError(
            "Both `parent_id` and `child_id` must be provided to add a relationship."
        )
    if (
        parent_id not in self.attached_objects.keys()
        or child_id not in self.attached_objects.keys()
    ):
        raise ValueError(
            "Both `parent_id` and `child_id` must be assigned to objects attached to the collection."
        )
    relationship_id = generate_object_relationship_id(parent_id, child_id)
    self.relationships[relationship_id] = (parent_id, child_id)
    return relationship_id

remove_relationship(relationship_id)

Remove a relationship from the collection type.

PARAMETER DESCRIPTION
relationship_id

The unique identifier of the relationship to remove.

TYPE: str

Source code in bam_masterdata/metadata/entities.py
def remove_relationship(self, relationship_id: str) -> None:
    """
    Remove a relationship from the collection type.

    Args:
        relationship_id (str): The unique identifier of the relationship to remove.
    """
    if not relationship_id:
        raise ValueError(
            "You must provide a `relationship_id` to remove the relationship from the collection type."
        )
    if relationship_id not in self.relationships.keys():
        raise ValueError(
            f"Relationship with ID '{relationship_id}' does not exist in the collection type."
        )
    del self.relationships[relationship_id]

DatasetType

Bases: ObjectType

Source code in bam_masterdata/metadata/entities.py
class DatasetType(ObjectType):
    @property
    def base_name(self) -> str:
        """
        Returns the entity name of the class as a string.
        """
        return "DatasetType"

    def to_openbis(
        self,
        logger: "BoundLoggerLazyProxy",
        openbis: "Openbis",
        type: str = "dataset",
        type_map: dict = DATASET_TYPE_MAP,
    ) -> None:
        def get_type(openbis: "Openbis", code: str):
            return openbis.get_dataset_type(code)

        def create_type(openbis: "Openbis", defs: DatasetTypeDef):
            return openbis.new_dataset_type(
                code=defs.code,
                description=defs.description,
                validationPlugin=defs.validation_script,
                # This is not accepted by openBIS when creating dataset types
                # mainDatasetPattern=defs.main_dataset_pattern,
                # mainDatasetPath=defs.main_dataset_path,
            )

        super()._to_openbis(
            logger=logger,
            openbis=openbis,
            type=type,
            type_map=type_map,
            get_type=get_type,
            create_type=create_type,
        )

base_name

Returns the entity name of the class as a string.

to_openbis(logger, openbis, type='dataset', type_map=DATASET_TYPE_MAP)

Source code in bam_masterdata/metadata/entities.py
def to_openbis(
    self,
    logger: "BoundLoggerLazyProxy",
    openbis: "Openbis",
    type: str = "dataset",
    type_map: dict = DATASET_TYPE_MAP,
) -> None:
    def get_type(openbis: "Openbis", code: str):
        return openbis.get_dataset_type(code)

    def create_type(openbis: "Openbis", defs: DatasetTypeDef):
        return openbis.new_dataset_type(
            code=defs.code,
            description=defs.description,
            validationPlugin=defs.validation_script,
            # This is not accepted by openBIS when creating dataset types
            # mainDatasetPattern=defs.main_dataset_pattern,
            # mainDatasetPath=defs.main_dataset_path,
        )

    super()._to_openbis(
        logger=logger,
        openbis=openbis,
        type=type,
        type_map=type_map,
        get_type=get_type,
        create_type=create_type,
    )

VocabularyType

Bases: BaseEntity

Base class used to define vocabulary types. All vocabulary types must inherit from this class. The vocabulary types are defined in the module bam_masterdata/vocabulary_types.py.

The VocabularyType class contains a list of all terms defined for a VocabularyType, for internally represent the model in other formats (e.g., JSON or Excel).

Source code in bam_masterdata/metadata/entities.py
class VocabularyType(BaseEntity):
    """
    Base class used to define vocabulary types. All vocabulary types must inherit from this class. The
    vocabulary types are defined in the module `bam_masterdata/vocabulary_types.py`.

    The `VocabularyType` class contains a list of all `terms` defined for a `VocabularyType`, for
    internally represent the model in other formats (e.g., JSON or Excel).
    """

    model_config = ConfigDict(ignored_types=(VocabularyTypeDef, VocabularyTerm))

    terms: list[VocabularyTerm] = Field(
        default=[],
        description="""
        List of vocabulary terms. This is useful for internal representation of the model.
        """,
    )

    @property
    def base_name(self) -> str:
        """
        Returns the entity name of the class as a string.
        """
        return "VocabularyType"

    @model_validator(mode="after")
    @classmethod
    def model_validator_after_init(cls, data: Any) -> Any:
        """
        Validate the model after instantiation of the class.

        Args:
            data (Any): The data containing the fields values to validate.

        Returns:
            Any: The data with the validated fields.
        """
        # Add all the vocabulary terms defined in the vocabulary type to the `terms` list.
        # TODO check if the order is properly assigned
        for base in cls.__mro__:
            for attr_name, attr_val in base.__dict__.items():
                if isinstance(attr_val, VocabularyTerm):
                    data.terms.append(attr_val)

        return data

    def to_openbis(
        self,
        logger: "BoundLoggerLazyProxy",
        openbis: "Openbis",
        type: str = "vocabulary",
        type_map: dict = VOCABULARY_TYPE_MAP,
    ) -> None:
        def get_type(openbis: "Openbis", code: str):
            return openbis.get_vocabulary(code)

        def create_type(openbis: "Openbis", defs: VocabularyTypeDef, terms: list):
            return openbis.new_vocabulary(
                code=defs.code, description=defs.description, terms=terms
            )

        super()._to_openbis(
            logger=logger,
            openbis=openbis,
            type=type,
            type_map=type_map,
            get_type=get_type,
            create_type=create_type,
        )

model_config = ConfigDict(ignored_types=(VocabularyTypeDef, VocabularyTerm))

terms = Field(default=[], description='\n List of vocabulary terms. This is useful for internal representation of the model.\n ')

base_name

Returns the entity name of the class as a string.

model_validator_after_init(data)

Validate the model after instantiation of the class.

PARAMETER DESCRIPTION
data

The data containing the fields values to validate.

TYPE: Any

RETURNS DESCRIPTION
Any

The data with the validated fields.

TYPE: Any

Source code in bam_masterdata/metadata/entities.py
@model_validator(mode="after")
@classmethod
def model_validator_after_init(cls, data: Any) -> Any:
    """
    Validate the model after instantiation of the class.

    Args:
        data (Any): The data containing the fields values to validate.

    Returns:
        Any: The data with the validated fields.
    """
    # Add all the vocabulary terms defined in the vocabulary type to the `terms` list.
    # TODO check if the order is properly assigned
    for base in cls.__mro__:
        for attr_name, attr_val in base.__dict__.items():
            if isinstance(attr_val, VocabularyTerm):
                data.terms.append(attr_val)

    return data

to_openbis(logger, openbis, type='vocabulary', type_map=VOCABULARY_TYPE_MAP)

Source code in bam_masterdata/metadata/entities.py
def to_openbis(
    self,
    logger: "BoundLoggerLazyProxy",
    openbis: "Openbis",
    type: str = "vocabulary",
    type_map: dict = VOCABULARY_TYPE_MAP,
) -> None:
    def get_type(openbis: "Openbis", code: str):
        return openbis.get_vocabulary(code)

    def create_type(openbis: "Openbis", defs: VocabularyTypeDef, terms: list):
        return openbis.new_vocabulary(
            code=defs.code, description=defs.description, terms=terms
        )

    super()._to_openbis(
        logger=logger,
        openbis=openbis,
        type=type,
        type_map=type_map,
        get_type=get_type,
        create_type=create_type,
    )

bam_masterdata.metadata.definitions

EntityDef

Bases: BaseModel

Abstract base class for all masterdata entity definitions. The entity definitions are immutable properties. This class provides a common interface (with common attributes like code and description.) for all entity definitions.

Source code in bam_masterdata/metadata/definitions.py
class EntityDef(BaseModel):
    """
    Abstract base class for all masterdata entity definitions. The entity definitions are immutable properties.
    This class provides a common interface (with common attributes like `code` and
    `description`.) for all entity definitions.
    """

    code: str = Field(
        ...,
        description="""
        Code string identifying the entity with an openBIS inventory definition. Note that:

        - Must be uppercase and separated by underscores, e.g. `'EXPERIMENTAL_STEP'`.
        - If the entity is native to openBIS, the code must start with a dollar sign, e.g. `'$NAME'`.
        - In the case of inheritance, it needs to be separated by dots, e.g. `'WELDING_EQUIPMENT.INSTRUMENT'`.
        """,
    )

    description: str = Field(
        ...,
        description="""
        Description of the entity. This is the human-readable text for the object and must be
        as complete and concise as possible. The German description can be added after the English
        description separated by a double slash (//), e.g. `'Chemical Substance//Chemische Substanz'`.
        """,
    )

    # TODO: check if it is necessary to add something like `ontology_annotation_id` in the future
    iri: str | None = Field(
        default=None,
        description="""
        IRI (Internationalized Resource Identifier) of the entity. This is a unique identifier for the entity
        and is used to link the entity to an ontology. It is a string with the format `"<ontology_id>:<ontology_version>"`.
        Example: "http://purl.obolibrary.org/bam-masterdata/Instrument:1.0.0".
        """,
    )

    id: str | None = Field(
        default=None,
        description="""
        Identifier of the entity defined as the class name and used to serialize the entity definitions
        in other formats.
        """,
    )

    row_location: str | None = Field(
        default=None,
        description="""
        Row in the Excel at which the entity type field is defined. It is a string with the format `"<row-letter><row_number>"`.
        Example: "A1" ot "A107". This field is useful when checking the consistency of Excel files with multiple entity
        types defined to quickly locate the specific Excel cell which logs a message when applying the `checker` CLI.
        """,
    )

    # TODO check ontology_id, ontology_version, ontology_annotation_id, internal (found in the openBIS docu)

    @field_validator("code")
    @classmethod
    def validate_code(cls, value: str) -> str:
        if not value or not re.match(r"^[\w_\$\.\-\+]+$", value):
            raise ValueError(
                "`code` must follow the rules specified in the description: 1) Must be uppercase, "
                "2) separated by underscores, 3) start with a dollar sign if native to openBIS, "
                "4) separated by dots if there is inheritance."
            )
        return value

    @field_validator("iri")
    @classmethod
    def validate_iri(cls, value: str | None) -> str | None:
        if not value:
            return value
        if not re.match(
            r"^http://purl.obolibrary.org/bam-masterdata/[\w_]+:[\d.]+$", value
        ):
            raise ValueError(
                "`iri` must follow the rules specified in the description: 1) Must start with 'http://purl.obolibrary.org/bam-masterdata/', "
                "2) followed by the entity name, 3) separated by a colon, 4) followed by the semantic versioning number. "
                "Example: 'http://purl.obolibrary.org/bam-masterdata/Instrument:1.0.0'."
            )
        return value

    @field_validator("description")
    @classmethod
    def strip_description(cls, value: str) -> str:
        return value.strip()

    @property
    def name(self) -> str:
        return self.__class__.__name__

    @property
    def excel_name(self) -> str:
        """
        Returns the name of the entity in a format suitable for the openBIS Excel file.
        """
        name_map = {
            "CollectionTypeDef": "EXPERIMENT_TYPE",
            "DatasetTypeDef": "DATASET_TYPE",
            "ObjectTypeDef": "SAMPLE_TYPE",
            "VocabularyTypeDef": "VOCABULARY_TYPE",
        }
        return name_map.get(self.name)

    @property
    def excel_headers_map(self) -> dict:
        """
        Maps the field keys of the Pydantic model into the openBIS Excel style headers.
        """
        fields = [
            k
            for k in self.model_fields.keys()
            if k not in ["iri", "id", "row_location"]
        ]
        headers: dict = {}
        for f in fields:
            headers[f] = f.replace("_", " ").capitalize()
        return headers

    @model_validator(mode="after")
    @classmethod
    def model_id(cls, data: Any) -> Any:
        """
        Stores the model `id` as the class name from the `code` field.

        Args:
            data (Any): The data containing the fields values to validate.

        Returns:
            Any: The data with the validated fields.
        """
        if "PropertyType" in data.name:
            data.id = code_to_class_name(code=data.code, entity_type="property")
        else:
            data.id = code_to_class_name(code=data.code, entity_type="object")
        return data

code = Field(..., description="\n Code string identifying the entity with an openBIS inventory definition. Note that:\n\n - Must be uppercase and separated by underscores, e.g. `'EXPERIMENTAL_STEP'`.\n - If the entity is native to openBIS, the code must start with a dollar sign, e.g. `'$NAME'`.\n - In the case of inheritance, it needs to be separated by dots, e.g. `'WELDING_EQUIPMENT.INSTRUMENT'`.\n ")

description = Field(..., description="\n Description of the entity. This is the human-readable text for the object and must be\n as complete and concise as possible. The German description can be added after the English\n description separated by a double slash (//), e.g. `'Chemical Substance//Chemische Substanz'`.\n ")

iri = Field(default=None, description='\n IRI (Internationalized Resource Identifier) of the entity. This is a unique identifier for the entity\n and is used to link the entity to an ontology. It is a string with the format `"<ontology_id>:<ontology_version>"`.\n Example: "http://purl.obolibrary.org/bam-masterdata/Instrument:1.0.0".\n ')

id = Field(default=None, description='\n Identifier of the entity defined as the class name and used to serialize the entity definitions\n in other formats.\n ')

row_location = Field(default=None, description='\n Row in the Excel at which the entity type field is defined. It is a string with the format `"<row-letter><row_number>"`.\n Example: "A1" ot "A107". This field is useful when checking the consistency of Excel files with multiple entity\n types defined to quickly locate the specific Excel cell which logs a message when applying the `checker` CLI.\n ')

name

excel_name

Returns the name of the entity in a format suitable for the openBIS Excel file.

excel_headers_map

Maps the field keys of the Pydantic model into the openBIS Excel style headers.

validate_code(value)

Source code in bam_masterdata/metadata/definitions.py
@field_validator("code")
@classmethod
def validate_code(cls, value: str) -> str:
    if not value or not re.match(r"^[\w_\$\.\-\+]+$", value):
        raise ValueError(
            "`code` must follow the rules specified in the description: 1) Must be uppercase, "
            "2) separated by underscores, 3) start with a dollar sign if native to openBIS, "
            "4) separated by dots if there is inheritance."
        )
    return value

validate_iri(value)

Source code in bam_masterdata/metadata/definitions.py
@field_validator("iri")
@classmethod
def validate_iri(cls, value: str | None) -> str | None:
    if not value:
        return value
    if not re.match(
        r"^http://purl.obolibrary.org/bam-masterdata/[\w_]+:[\d.]+$", value
    ):
        raise ValueError(
            "`iri` must follow the rules specified in the description: 1) Must start with 'http://purl.obolibrary.org/bam-masterdata/', "
            "2) followed by the entity name, 3) separated by a colon, 4) followed by the semantic versioning number. "
            "Example: 'http://purl.obolibrary.org/bam-masterdata/Instrument:1.0.0'."
        )
    return value

strip_description(value)

Source code in bam_masterdata/metadata/definitions.py
@field_validator("description")
@classmethod
def strip_description(cls, value: str) -> str:
    return value.strip()

model_id(data)

Stores the model id as the class name from the code field.

PARAMETER DESCRIPTION
data

The data containing the fields values to validate.

TYPE: Any

RETURNS DESCRIPTION
Any

The data with the validated fields.

TYPE: Any

Source code in bam_masterdata/metadata/definitions.py
@model_validator(mode="after")
@classmethod
def model_id(cls, data: Any) -> Any:
    """
    Stores the model `id` as the class name from the `code` field.

    Args:
        data (Any): The data containing the fields values to validate.

    Returns:
        Any: The data with the validated fields.
    """
    if "PropertyType" in data.name:
        data.id = code_to_class_name(code=data.code, entity_type="property")
    else:
        data.id = code_to_class_name(code=data.code, entity_type="object")
    return data

ObjectTypeDef

Bases: BaseObjectTypeDef

Definition class for an object type. It adds the fields of generated_code_prefix, auto_generate_codes, and validation_script to the common attributes of a base object type definition. E.g.:

class Instrument(BaseModel):
    defs = ObjectTypeDef(
        code='INSTRUMENT',
        description='
        Measuring Instrument//Messgerät
        ',
        generated_code_prefix='INS',
    )
Source code in bam_masterdata/metadata/definitions.py
class ObjectTypeDef(BaseObjectTypeDef):
    """
    Definition class for an object type. It adds the fields of `generated_code_prefix`, `auto_generate_codes`,
    and `validation_script` to the common attributes of a base object type definition. E.g.:

    ```python
    class Instrument(BaseModel):
        defs = ObjectTypeDef(
            code='INSTRUMENT',
            description='
            Measuring Instrument//Messger\u00e4t
            ',
            generated_code_prefix='INS',
        )
    ```
    """

    generated_code_prefix: str | None = Field(
        default=None,
        description="""
        A short prefix for the defined object type, e.g. 'CHEM'. If not specified, it is defined
        using the first 3 characters of `code`.
        """,
    )

    auto_generate_codes: bool = Field(
        True,
        description="""
        Boolean used to generate codes using `generated_code_prefix` plus a unique number. Set to
        True by default.
        """,
    )

    @model_validator(mode="after")
    @classmethod
    def model_validator_after_init(cls, data: Any) -> Any:
        """
        Validate the model after instantiation of the class.

        Args:
            data (Any): The data containing the fields values to validate.

        Returns:
            Any: The data with the validated fields.
        """
        # If `generated_code_prefix` is not set, use the first 3 characters of `code`
        if not data.generated_code_prefix:
            data.generated_code_prefix = data.code[:3]

        return data

generated_code_prefix = Field(default=None, description="\n A short prefix for the defined object type, e.g. 'CHEM'. If not specified, it is defined\n using the first 3 characters of `code`.\n ")

auto_generate_codes = Field(True, description='\n Boolean used to generate codes using `generated_code_prefix` plus a unique number. Set to\n True by default.\n ')

model_validator_after_init(data)

Validate the model after instantiation of the class.

PARAMETER DESCRIPTION
data

The data containing the fields values to validate.

TYPE: Any

RETURNS DESCRIPTION
Any

The data with the validated fields.

TYPE: Any

Source code in bam_masterdata/metadata/definitions.py
@model_validator(mode="after")
@classmethod
def model_validator_after_init(cls, data: Any) -> Any:
    """
    Validate the model after instantiation of the class.

    Args:
        data (Any): The data containing the fields values to validate.

    Returns:
        Any: The data with the validated fields.
    """
    # If `generated_code_prefix` is not set, use the first 3 characters of `code`
    if not data.generated_code_prefix:
        data.generated_code_prefix = data.code[:3]

    return data

CollectionTypeDef

Bases: BaseObjectTypeDef

Definition class for a collection type. E.g.:

class DefaultExperiment(BaseModel):
    defs = CollectionTypeDef(
        code='DEFAULT_EXPERIMENT',
        description='...',
        validation_script='DEFAULT_EXPERIMENT.date_range_validation',
    )
Source code in bam_masterdata/metadata/definitions.py
class CollectionTypeDef(BaseObjectTypeDef):
    """
    Definition class for a collection type. E.g.:

    ```python
    class DefaultExperiment(BaseModel):
        defs = CollectionTypeDef(
            code='DEFAULT_EXPERIMENT',
            description='...',
            validation_script='DEFAULT_EXPERIMENT.date_range_validation',
        )
    ```
    """

    pass

DatasetTypeDef

Bases: BaseObjectTypeDef

Definition class for a data set type. E.g.:

```python class RawData(BaseModel): defs = DatasetTypeDef( code='RAW_DATA', description='...', )

Source code in bam_masterdata/metadata/definitions.py
class DatasetTypeDef(BaseObjectTypeDef):
    """
    Definition class for a data set type. E.g.:

    ```python
    class RawData(BaseModel):
        defs = DatasetTypeDef(
            code='RAW_DATA',
            description='...',
        )
    """

    # TODO add descriptions for `main_dataset_pattern` and `main_dataset_path`

    main_dataset_pattern: str | None = Field(
        default=None,
        description="""""",
    )

    main_dataset_path: str | None = Field(
        default=None,
        description="""""",
    )

main_dataset_pattern = Field(default=None, description='')

main_dataset_path = Field(default=None, description='')

VocabularyTypeDef

Bases: EntityDef

Definition class for a vocabulary type. It adds the fields of url_template to the common attributes of an entity definition. E.g.:

class DocumentType(VocabularyType):
    defs = VocabularyTypeDef(
        code='DOCUMENT_TYPE',
        description='Document type//Dokumententypen',
    )
Source code in bam_masterdata/metadata/definitions.py
class VocabularyTypeDef(EntityDef):
    """
    Definition class for a vocabulary type. It adds the fields of `url_template` to the common attributes of
    an entity definition. E.g.:

    ```python
    class DocumentType(VocabularyType):
        defs = VocabularyTypeDef(
            code='DOCUMENT_TYPE',
            description='Document type//Dokumententypen',
        )
    ```
    """

    # TODO add descriptions for `url_template`

    url_template: str | None = Field(
        default=None,
        description="""""",
    )

url_template = Field(default=None, description='')

PropertyTypeDef

Bases: EntityDef

Definition class for a property type. It adds the fields of property_label, data_type, vocabulary_code, metadata, dynamic_script, and multivalued to the common attributes of an entity definition.

This class is used as an abstract layer for PropertyTypeAssignment, as in openBIS a PropertyType definition has less fields than when it is actually assigned to an entity type.

Source code in bam_masterdata/metadata/definitions.py
class PropertyTypeDef(EntityDef):
    """
    Definition class for a property type. It adds the fields of `property_label`, `data_type`,
    `vocabulary_code`, `metadata`, `dynamic_script`, and `multivalued` to the common attributes of
    an entity definition.

    This class is used as an abstract layer for `PropertyTypeAssignment`, as in openBIS a PropertyType
    definition has less fields than when it is actually assigned to an entity type.
    """

    property_label: str = Field(
        ...,
        description="""
        Label that appears in the inventory view. This is the human-readable text for the property
        type definition, and it typically coincides with the `code`, e.g., `'Monitoring date'` for the
        `MONITORING_DATE` property type.
        """,
    )

    data_type: DataType = Field(
        ...,
        description="""
        The data type of the property, i.e., if it is an integer, float, string, etc. The allowed
        data types in openBIS are:
            - `BOOLEAN`
            - `CONTROLLEDVOCABULARY`
            - `DATE`
            - `HYPERLINK`
            - `INTEGER`
            - `MATERIAL`
            - `MULTILINE_VARCHAR`
            - `OBJECT`
            - `SAMPLE`
            - `REAL`
            - `TIMESTAMP`
            - `VARCHAR`
            - `XML`

        These are defined as an enumeration in the `DataType` class.

        Read more in https://openbis.readthedocs.io/en/latest/uncategorized/register-master-data-via-the-admin-interface.html#data-types-available-in-openbis.
        """,
    )

    vocabulary_code: str | None = Field(
        default=None,
        description="""
        String identifying the controlled vocabulary used for the data type of the property. This is
        thus only relevant if `data_type == 'CONTROLLEDVOCABULARY'`.
        """,
    )

    object_code: str | None = Field(
        default=None,
        description="""
        String identifying the object type used for the data type of the property. This is only
        relevant if `data_type == 'OBJECT'`.
        """,
    )

    # TODO add descriptions for `dynamic_script`

    metadata: dict | None = Field(
        default=None,
        description="""
        General metadata written in a dictionary format. This is used to store additional information
        about the property type, e.g., `{'unit': 'm', 'precision': 2}`.
        """,
    )

    dynamic_script: str | None = Field(
        default=None,
        description="""""",
    )

property_label = Field(..., description="\n Label that appears in the inventory view. This is the human-readable text for the property\n type definition, and it typically coincides with the `code`, e.g., `'Monitoring date'` for the\n `MONITORING_DATE` property type.\n ")

data_type = Field(..., description='\n The data type of the property, i.e., if it is an integer, float, string, etc. The allowed\n data types in openBIS are:\n - `BOOLEAN`\n - `CONTROLLEDVOCABULARY`\n - `DATE`\n - `HYPERLINK`\n - `INTEGER`\n - `MATERIAL`\n - `MULTILINE_VARCHAR`\n - `OBJECT`\n - `SAMPLE`\n - `REAL`\n - `TIMESTAMP`\n - `VARCHAR`\n - `XML`\n\n These are defined as an enumeration in the `DataType` class.\n\n Read more in https://openbis.readthedocs.io/en/latest/uncategorized/register-master-data-via-the-admin-interface.html#data-types-available-in-openbis.\n ')

vocabulary_code = Field(default=None, description="\n String identifying the controlled vocabulary used for the data type of the property. This is\n thus only relevant if `data_type == 'CONTROLLEDVOCABULARY'`.\n ")

object_code = Field(default=None, description="\n String identifying the object type used for the data type of the property. This is only\n relevant if `data_type == 'OBJECT'`.\n ")

metadata = Field(default=None, description="\n General metadata written in a dictionary format. This is used to store additional information\n about the property type, e.g., `{'unit': 'm', 'precision': 2}`.\n ")

dynamic_script = Field(default=None, description='')

PropertyTypeAssignment

Bases: PropertyTypeDef

Base class used to define properties inside ObjectType, CollectionType, or DatasetType. This is used to construct these types by assigning property types to them. It adds the fields of mandatory, show_in_edit_views, section, unique, and internal_assignment to the common attributes of a property type definition. E.g.:

class Instrument(ObjectType):
    defs = ObjectTypeDef(
        code='INSTRUMENT',
        description='
        Measuring Instrument//Messgerät
        ',
        generated_code_prefix='INS',
    )

    alias = PropertyTypeAssignment(
        code='ALIAS',
        data_type='VARCHAR',
        property_label='Alternative name',
        description='
        e.g. abbreviation or nickname//z.B. Abkürzung oder Spitzname//z.B. Abkürzung oder Spitzname
        ',
        mandatory=False,
        show_in_edit_views=True,
        section='General information',
    )

    # ... other property type assignments here ...
Source code in bam_masterdata/metadata/definitions.py
class PropertyTypeAssignment(PropertyTypeDef):
    """
    Base class used to define properties inside `ObjectType`, `CollectionType`, or `DatasetType`.
    This is used to construct these types by assigning property types to them. It adds the fields
    of `mandatory`, `show_in_edit_views`, `section`, `unique`, and `internal_assignment` to the common
    attributes of a property type definition. E.g.:

    ```python
    class Instrument(ObjectType):
        defs = ObjectTypeDef(
            code='INSTRUMENT',
            description='
            Measuring Instrument//Messger\u00e4t
            ',
            generated_code_prefix='INS',
        )

        alias = PropertyTypeAssignment(
            code='ALIAS',
            data_type='VARCHAR',
            property_label='Alternative name',
            description='
            e.g. abbreviation or nickname//z.B. Abkürzung oder Spitzname//z.B. Abkürzung oder Spitzname
            ',
            mandatory=False,
            show_in_edit_views=True,
            section='General information',
        )

        # ... other property type assignments here ...
    ```
    """

    mandatory: bool = Field(
        ...,
        description="""
        If `True`, the property is mandatory and has to be set during instantiation of the object type.
        If `False`, the property is optional.
        """,
    )

    show_in_edit_views: bool = Field(
        ...,
        description="""
        If `True`, the property is shown in the edit views of the ELN in the object type instantiation.
        If `False`, the property is hidden.
        """,
    )

    section: str = Field(
        ...,
        description="""
        Section to which the property type belongs to. E.g., `'General Information'`.
        """,
    )

    # TODO add descriptions for `unique` and `internal_assignment`

    unique: str | None = Field(
        default=None,
        description="""""",
    )

    internal_assignment: str | None = Field(
        default=None,
        description="""""",
    )

mandatory = Field(..., description='\n If `True`, the property is mandatory and has to be set during instantiation of the object type.\n If `False`, the property is optional.\n ')

show_in_edit_views = Field(..., description='\n If `True`, the property is shown in the edit views of the ELN in the object type instantiation.\n If `False`, the property is hidden.\n ')

section = Field(..., description="\n Section to which the property type belongs to. E.g., `'General Information'`.\n ")

unique = Field(default=None, description='')

internal_assignment = Field(default=None, description='')

VocabularyTerm

Bases: VocabularyTypeDef

Base class used to define terms inside a VocabularyType. This is used to construct the vocabulary types by assigning vocabulary terms to them. It adds the fields of label and official to the common attributes of a vocabulary type definition. E.g.:

```python class DocumentType(VocabularyType): defs = VocabularyTypeDef( code='DOCUMENT_TYPE', description='Document type//Dokumententypen', )

acceptance_certificate = VocabularyTerm(
    code='ACCEPTANCE_CERTIFICATE',
    label='Acceptance Certificate',
    description='Acceptance Certificate//Abnahmezeugnis',
)

calibration_certificate = VocabularyTerm(
    code='CALIBRATION_CERTIFICATE',
    label='Calibration Certificate',
    description='Calibration Certificate//Kalibrierschein',
)

# ... other vocabulary term definitions here ...
Source code in bam_masterdata/metadata/definitions.py
class VocabularyTerm(VocabularyTypeDef):
    """
    Base class used to define terms inside a `VocabularyType`. This is used to construct the vocabulary types
    by assigning vocabulary terms to them. It adds the fields of `label` and `official` to the common attributes
    of a vocabulary type definition. E.g.:

    ```python
    class DocumentType(VocabularyType):
        defs = VocabularyTypeDef(
            code='DOCUMENT_TYPE',
            description='Document type//Dokumententypen',
        )

        acceptance_certificate = VocabularyTerm(
            code='ACCEPTANCE_CERTIFICATE',
            label='Acceptance Certificate',
            description='Acceptance Certificate//Abnahmezeugnis',
        )

        calibration_certificate = VocabularyTerm(
            code='CALIBRATION_CERTIFICATE',
            label='Calibration Certificate',
            description='Calibration Certificate//Kalibrierschein',
        )

        # ... other vocabulary term definitions here ...
    """

    # TODO add descriptions for `label` and `official`

    label: str = Field(
        ...,
        description="""""",
    )

    official: bool = Field(
        True,
        description="""""",
    )

label = Field(..., description='')

official = Field(True, description='')


bam_masterdata.metadata.entities_dict

EntitiesDict

Class to convert the entities in the datamodel defined in Python to a dictionary. The entities are read from the Python files defined in python_path.

Source code in bam_masterdata/metadata/entities_dict.py
class EntitiesDict:
    """
    Class to convert the entities in the datamodel defined in Python to a dictionary. The entities are read from the Python
    files defined in `python_path`.
    """

    def __init__(self, python_path: str = "", **kwargs):
        self.python_path = python_path
        self.logger = kwargs.get("logger", logger)
        self.data: dict = {}

    def to_dict(self, module_path: str) -> dict:
        """
        Returns a dictionary containing entities read from the `module_path` Python file. The Python modules
        are imported using the function `import_module` and their contents are inspected (using `inspect`) to
        find the classes in the datamodel containing `defs` and with a `model_to_dict` method defined.

        Args:
            module_path (str): Path to the Python module file.

        Returns:
            dict: A dictionary containing the entities in the datamodel defined in one Python module file.
        """
        module = import_module(module_path=module_path)

        # initializing the dictionary with keys as the `code` of the entity and values the json dumped data
        data: dict = {}

        # Read the module source code and store line numbers
        with open(module_path, encoding="utf-8") as f:
            module_source = f.readlines()

        # Detect class definitions (entity types)
        class_locations = {
            match.group(1): i + 1  # Store line number (1-based index)
            for i, line in enumerate(module_source)
            if (match := re.match(r"^\s*class\s+(\w+)\s*\(.*\):", line))
        }

        # Detect property assignments (`PropertyTypeAssignment(...)`) with class context
        property_locations: dict = {}
        current_class = None

        for i, line in enumerate(module_source):
            class_match = re.match(r"^\s*class\s+(\w+)\s*\(.*\):", line)
            if class_match:
                current_class = class_match.group(1)

            prop_match = re.search(r"^\s*(\w+)\s*=\s*PropertyTypeAssignment\(", line)
            if prop_match and current_class:
                property_name = prop_match.group(1)
                if current_class not in property_locations:
                    property_locations[current_class] = {}
                property_locations[current_class][property_name] = i + 1

        # Detect vocabulary terms (`VocabularyTerm(...)`) with class context
        vocabulary_term_locations: dict = {}
        current_vocab_class = None

        for i, line in enumerate(module_source):
            class_match = re.match(r"^\s*class\s+(\w+)\s*\(.*\):", line)
            if class_match:
                current_vocab_class = class_match.group(1)

            term_match = re.search(r"^\s*(\w+)\s*=\s*VocabularyTerm\(", line)
            if term_match and current_vocab_class:
                term_name = term_match.group(1)
                if current_vocab_class not in vocabulary_term_locations:
                    vocabulary_term_locations[current_vocab_class] = {}
                vocabulary_term_locations[current_vocab_class][term_name] = i + 1

        # Process all classes in the module
        for name, obj in inspect.getmembers(module, inspect.isclass):
            if not hasattr(obj, "defs") or not callable(getattr(obj, "model_to_dict")):
                continue
            try:
                obj_data = obj().model_to_dict()
                obj_data["defs"]["row_location"] = class_locations.get(name, None)

                if "properties" in obj_data:
                    # Processing standard properties (PropertyTypeAssignment)
                    for prop in obj_data["properties"]:
                        prop_id = (
                            prop["code"].lower().replace(".", "_").replace("$", "")
                        )
                        matched_key = next(
                            (
                                key
                                for key in property_locations.get(name, {})
                                if key == prop_id
                            ),
                            None,
                        )
                        prop["row_location"] = property_locations.get(name, {}).get(
                            matched_key, None
                        )

                elif "terms" in obj_data:
                    # Processing vocabulary terms (VocabularyTerm)
                    for term in obj_data["terms"]:
                        term_id = term["code"].lower().replace(".", "_")
                        matched_key = next(
                            (
                                key
                                for key in vocabulary_term_locations.get(name, {})
                                if key == term_id
                            ),
                            None,
                        )
                        term["row_location"] = vocabulary_term_locations.get(
                            name, {}
                        ).get(matched_key, None)

                data[obj.defs.code] = obj_data
            except Exception as err:
                click.echo(f"Failed to process class {name} in {module_path}: {err}")

        return data

    def single_json(self) -> dict:
        """
        Returns a single dictionary containing all the entities in the datamodel defined in the Python files
        in `python_path`. The format of this dictionary is:
            {
                "collection_type": {
                    "COLLECTION": {
                        "defs": {
                            "code": "COLLECTION",
                            "description": "",
                            ...
                        },
                        "properties": [
                            {
                                "code": "$DEFAULT_COLLECTION_VIEW",
                                "description": "Default view for experiments of the type collection",
                                ...
                            },
                            {...},
                            ...
                        ]
                    }
                },
                "object_type": {...},
                ...
            }

        Returns:
            dict: A dictionary containing all the entities in the datamodel.
        """
        # Get the Python modules to process the datamodel
        py_modules = listdir_py_modules(
            directory_path=self.python_path, logger=self.logger
        )

        # Process each module using the `model_to_dict` method of each entity and store them in a single dictionary
        full_data: dict = {}
        for module_path in py_modules:
            data = self.to_dict(module_path=module_path)
            # name can be collection_type, object_type, dataset_type, vocabulary_type, or property_type
            name = os.path.basename(module_path).replace(".py", "")
            full_data[name] = data
        return full_data

python_path = python_path

logger = kwargs.get('logger', logger)

data = {}

__init__(python_path='', **kwargs)

Source code in bam_masterdata/metadata/entities_dict.py
def __init__(self, python_path: str = "", **kwargs):
    self.python_path = python_path
    self.logger = kwargs.get("logger", logger)
    self.data: dict = {}

to_dict(module_path)

Returns a dictionary containing entities read from the module_path Python file. The Python modules are imported using the function import_module and their contents are inspected (using inspect) to find the classes in the datamodel containing defs and with a model_to_dict method defined.

PARAMETER DESCRIPTION
module_path

Path to the Python module file.

TYPE: str

RETURNS DESCRIPTION
dict

A dictionary containing the entities in the datamodel defined in one Python module file.

TYPE: dict

Source code in bam_masterdata/metadata/entities_dict.py
def to_dict(self, module_path: str) -> dict:
    """
    Returns a dictionary containing entities read from the `module_path` Python file. The Python modules
    are imported using the function `import_module` and their contents are inspected (using `inspect`) to
    find the classes in the datamodel containing `defs` and with a `model_to_dict` method defined.

    Args:
        module_path (str): Path to the Python module file.

    Returns:
        dict: A dictionary containing the entities in the datamodel defined in one Python module file.
    """
    module = import_module(module_path=module_path)

    # initializing the dictionary with keys as the `code` of the entity and values the json dumped data
    data: dict = {}

    # Read the module source code and store line numbers
    with open(module_path, encoding="utf-8") as f:
        module_source = f.readlines()

    # Detect class definitions (entity types)
    class_locations = {
        match.group(1): i + 1  # Store line number (1-based index)
        for i, line in enumerate(module_source)
        if (match := re.match(r"^\s*class\s+(\w+)\s*\(.*\):", line))
    }

    # Detect property assignments (`PropertyTypeAssignment(...)`) with class context
    property_locations: dict = {}
    current_class = None

    for i, line in enumerate(module_source):
        class_match = re.match(r"^\s*class\s+(\w+)\s*\(.*\):", line)
        if class_match:
            current_class = class_match.group(1)

        prop_match = re.search(r"^\s*(\w+)\s*=\s*PropertyTypeAssignment\(", line)
        if prop_match and current_class:
            property_name = prop_match.group(1)
            if current_class not in property_locations:
                property_locations[current_class] = {}
            property_locations[current_class][property_name] = i + 1

    # Detect vocabulary terms (`VocabularyTerm(...)`) with class context
    vocabulary_term_locations: dict = {}
    current_vocab_class = None

    for i, line in enumerate(module_source):
        class_match = re.match(r"^\s*class\s+(\w+)\s*\(.*\):", line)
        if class_match:
            current_vocab_class = class_match.group(1)

        term_match = re.search(r"^\s*(\w+)\s*=\s*VocabularyTerm\(", line)
        if term_match and current_vocab_class:
            term_name = term_match.group(1)
            if current_vocab_class not in vocabulary_term_locations:
                vocabulary_term_locations[current_vocab_class] = {}
            vocabulary_term_locations[current_vocab_class][term_name] = i + 1

    # Process all classes in the module
    for name, obj in inspect.getmembers(module, inspect.isclass):
        if not hasattr(obj, "defs") or not callable(getattr(obj, "model_to_dict")):
            continue
        try:
            obj_data = obj().model_to_dict()
            obj_data["defs"]["row_location"] = class_locations.get(name, None)

            if "properties" in obj_data:
                # Processing standard properties (PropertyTypeAssignment)
                for prop in obj_data["properties"]:
                    prop_id = (
                        prop["code"].lower().replace(".", "_").replace("$", "")
                    )
                    matched_key = next(
                        (
                            key
                            for key in property_locations.get(name, {})
                            if key == prop_id
                        ),
                        None,
                    )
                    prop["row_location"] = property_locations.get(name, {}).get(
                        matched_key, None
                    )

            elif "terms" in obj_data:
                # Processing vocabulary terms (VocabularyTerm)
                for term in obj_data["terms"]:
                    term_id = term["code"].lower().replace(".", "_")
                    matched_key = next(
                        (
                            key
                            for key in vocabulary_term_locations.get(name, {})
                            if key == term_id
                        ),
                        None,
                    )
                    term["row_location"] = vocabulary_term_locations.get(
                        name, {}
                    ).get(matched_key, None)

            data[obj.defs.code] = obj_data
        except Exception as err:
            click.echo(f"Failed to process class {name} in {module_path}: {err}")

    return data

single_json()

Returns a single dictionary containing all the entities in the datamodel defined in the Python files in python_path. The format of this dictionary is: { "collection_type": { "COLLECTION": { "defs": { "code": "COLLECTION", "description": "", ... }, "properties": [ { "code": "$DEFAULT_COLLECTION_VIEW", "description": "Default view for experiments of the type collection", ... }, {...}, ... ] } }, "object_type": {...}, ... }

RETURNS DESCRIPTION
dict

A dictionary containing all the entities in the datamodel.

TYPE: dict

Source code in bam_masterdata/metadata/entities_dict.py
def single_json(self) -> dict:
    """
    Returns a single dictionary containing all the entities in the datamodel defined in the Python files
    in `python_path`. The format of this dictionary is:
        {
            "collection_type": {
                "COLLECTION": {
                    "defs": {
                        "code": "COLLECTION",
                        "description": "",
                        ...
                    },
                    "properties": [
                        {
                            "code": "$DEFAULT_COLLECTION_VIEW",
                            "description": "Default view for experiments of the type collection",
                            ...
                        },
                        {...},
                        ...
                    ]
                }
            },
            "object_type": {...},
            ...
        }

    Returns:
        dict: A dictionary containing all the entities in the datamodel.
    """
    # Get the Python modules to process the datamodel
    py_modules = listdir_py_modules(
        directory_path=self.python_path, logger=self.logger
    )

    # Process each module using the `model_to_dict` method of each entity and store them in a single dictionary
    full_data: dict = {}
    for module_path in py_modules:
        data = self.to_dict(module_path=module_path)
        # name can be collection_type, object_type, dataset_type, vocabulary_type, or property_type
        name = os.path.basename(module_path).replace(".py", "")
        full_data[name] = data
    return full_data

bam_masterdata.cli.fill_masterdata

MasterdataCodeGenerator

Class to generate Python code for the masterdata datamodel based on the entities existing in an openBIS instance.

Source code in bam_masterdata/cli/fill_masterdata.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
class MasterdataCodeGenerator:
    """
    Class to generate Python code for the masterdata datamodel based on the entities existing in an
    openBIS instance.
    """

    def __init__(self, url: str = "", path: str = "", **kwargs):
        start_time = time.time()
        self.row_cell_info = kwargs.get("row_cell_info", False)
        # * This part takes some time due to the loading of all entities from Openbis
        if url:
            self.generator_type = "openbis"
            self.properties = OpenbisEntities(url=url).get_property_dict()
            self.collections = OpenbisEntities(url=url).get_collection_dict()
            self.datasets = OpenbisEntities(url=url).get_dataset_dict()
            self.objects = OpenbisEntities(url=url).get_object_dict()
            self.vocabularies = OpenbisEntities(url=url).get_vocabulary_dict()
            elapsed_time = time.time() - start_time
            click.echo(
                f"Loaded OpenBIS entities in `MasterdataCodeGenerator` initialization {elapsed_time:.2f} seconds\n"
            )
        else:
            self.generator_type = "excel"
            entities_dict = MasterdataExcelExtractor(
                excel_path=path, row_cell_info=self.row_cell_info
            ).excel_to_entities()
            self.properties = entities_dict.get("property_types", {})
            self.collections = entities_dict.get("collection_types", {})
            self.datasets = entities_dict.get("dataset_types", {})
            self.objects = entities_dict.get("object_types", {})
            self.vocabularies = entities_dict.get("vocabulary_types", {})
            elapsed_time = time.time() - start_time
            click.echo(
                f"Loaded Masterdata excel entities in `MasterdataCodeGenerator` initialization {elapsed_time:.2f} seconds\n"
            )

    def determine_parent_class(
        self, code: str, class_names: dict, default: str, lines: list
    ) -> tuple:
        """
        Determine the parent class information of the entity based on its `code`. It returns
        the `parent_code` and `parent class`, as well as the `class_name` of the entity. The
        class will inherit from `parent_class`.

        If the parent class does not exist, a note is added to the `lines` list for debugging purposes.

        Args:
            code (str): The code of the entity.
            class_names (dict): A dictionary with the class names of the entities.
            default (str): The default parent class if the parent class does not exist.
            lines (list): A list of strings to be printed to the Python module.
        Returns:
            tuple: The parent code, parent class, and class name of the entity.
        """
        parent_code = ""
        if "." in code:
            parent_code = code.rsplit(".", 1)[0]
        parent_class = class_names.get(parent_code, default)

        # Format class name
        class_name = code_to_class_name(code)
        class_names[code] = class_name

        # If the parent class does not exist but the `code` shows some inheritance, we add a note for debugging
        if parent_code and parent_class == default:
            lines.append(
                f"# ! The parent class of {class_name} is not defined (missing {parent_class})"
            )

        return parent_code, parent_class, class_name

    def get_property_object_code(self, prop_data: dict) -> str:
        """
        Get the object code (or vocabulary code) used for reference for the assigned property with `prop_code`.

        Args:
            prop_data (dict): The data information for the property as obtained from openBIS.

        Returns:
            str: The object/vocabulary code used for reference for the assigned property.
        """
        if not prop_data:
            return ""

        # TODO check excel extractor to add sampleType column
        object_code = prop_data.get("sampleType", "")
        if object_code:
            return object_code

        # TODO fix this patch and avoid using generator type
        vocabulary_code = ""
        if self.generator_type == "openbis":
            vocabulary_code = prop_data.get("vocabulary", "")
        elif self.generator_type == "excel":
            vocabulary_code = prop_data.get("vocabularyType", "")
        return vocabulary_code

    def add_properties(
        self, entities: dict, parent_code: str, data: dict, lines: list
    ) -> None:
        """
        Add the properties of the entity to the `lines` list. The properties are added as
        `PropertyTypeAssignment` objects.

        Note: the assigned properties do not have the information of `code` for the entity when
        data_type is OBJECT or CONTROLLEDVOCABULARY. These are instead defined in `property_types.py`.

        Args:
            entities (dict): The dictionary of entities (objects, collections, datasets, vocabularies).
            parent_code (code): The code of the parent class.
            data (dict): The data information for the entity as obtained from openBIS.
            lines (list): A list of strings to be printed to the Python module.
        """
        parent_properties_code = (
            entities.get(parent_code, {}).get("properties", {}).keys()
        )
        for prop_code, prop_data in data.get("properties", {}).items():
            # Skip "UNKNOWN" properties
            # We check if the property is inherited from the parent class
            if prop_code == "UNKNOWN" or prop_code in parent_properties_code:
                continue

            prop_name = prop_code.lstrip("$").replace(".", "_").lower()
            lines.append(f"    {prop_name} = PropertyTypeAssignment(")
            lines.append(f'        code="{prop_code}",')
            # ! patching dataType=SAMPLE instead of OBJECT
            data_type = prop_data.get("dataType", "")
            if data_type == "SAMPLE":
                data_type = "OBJECT"
            lines.append(f'        data_type="{data_type}",')
            if data_type == "OBJECT":
                object_code = self.get_property_object_code(prop_data=prop_data)
                if object_code:
                    lines.append(f'        object_code="{object_code}",')
            elif data_type == "CONTROLLEDVOCABULARY":
                vocabulary_code = self.get_property_object_code(prop_data=prop_data)
                if vocabulary_code:
                    lines.append(f'        vocabulary_code="{vocabulary_code}",')

            property_label = (prop_data.get("label") or "").replace("\n", "\\n")
            lines.append(f'        property_label="{property_label}",')
            description = (
                (prop_data.get("description") or "")
                .replace('"', "`")
                .replace("\n", "\\n")
                .replace("'", "`")
            )
            lines.append(f'        description="""{description}""",')
            lines.append(f"        mandatory={prop_data.get('mandatory', False)},")
            lines.append(
                f"        show_in_edit_views={prop_data.get('show_in_edit_views', False)},"
            )
            section = (
                (prop_data.get("section") or "")
                .replace('"', '\\"')
                .replace("\n", "\\n")
                .replace("'", "\\'")
            )
            lines.append(f'        section="{section}",')
            lines.append("    )")
            lines.append("")

    def generate_collection_types(self) -> str:
        """
        Generate Python code for the collection types in the Openbis datamodel. The code is generated
        as a string which is then printed out to the specific Python module in `bam_masterdata/datamodel/collection_types.py`.

        Returns:
            str: Python code for the collection types.
        """
        lines = []
        class_names: dict = {}
        # from bam_masterdata.metadata.definitions import (
        #     CollectionTypeDef,
        #     PropertyTypeAssignment,
        # )
        if self.collections != {}:
            # Add imports at the top
            lines.append("from bam_masterdata.metadata.definitions import (")
            lines.append("    CollectionTypeDef,")
            lines.append("    PropertyTypeAssignment,")
            lines.append(")")
            lines.append("from bam_masterdata.metadata.entities import CollectionType")
            lines.append("")
            lines.append("")

        # Process each collection type
        for code, data in self.collections.items():
            # Skip the "UNKNOWN" object type
            if code == "UNKNOWN":
                continue

            # Determine parent class
            parent_code, parent_class, class_name = self.determine_parent_class(
                code=code,
                class_names=class_names,
                default="CollectionType",
                lines=lines,
            )

            # Add class definition
            lines.append(f"class {class_name}({parent_class}):")
            lines.append("    defs = CollectionTypeDef(")
            lines.append(f'        code="{code}",')
            description = (
                (data.get("description") or "")
                .replace('"', "`")
                .replace("\n", "\\n")
                .replace("'", "`")
            )
            lines.append(f'        description="""{description}""",')
            if data.get("validationPlugin") != "":
                lines.append(
                    f'        validation_script="{data.get("validationPlugin")}",'
                )
            lines.append("    )")
            lines.append("")

            # Add properties
            self.add_properties(self.collections, parent_code, data, lines)
            # Add newline between classes
            lines.append("")

        return "\n".join(lines)

    def generate_dataset_types(self) -> str:
        """
        Generate Python code for the dataset types in the Openbis datamodel. The code is generated
        as a string which is then printed out to the specific Python module in `bam_masterdata/datamodel/dataset_types.py`.

        Returns:
            str: Python code for the dataset types.
        """
        lines = []
        class_names: dict = {}

        if self.datasets != {}:
            # Add imports at the top
            lines.append(
                "from bam_masterdata.metadata.definitions import DatasetTypeDef, PropertyTypeAssignment"
            )
            lines.append("from bam_masterdata.metadata.entities import DatasetType")
            lines.append("")
            lines.append("")

        # Process each dataset type
        for code, data in self.datasets.items():
            # Skip the "UNKNOWN" object type
            if code == "UNKNOWN":
                continue

            # Determine parent class
            parent_code, parent_class, class_name = self.determine_parent_class(
                code=code, class_names=class_names, default="DatasetType", lines=lines
            )

            # Add class definition
            lines.append(f"class {class_name}({parent_class}):")
            lines.append("    defs = DatasetTypeDef(")
            lines.append(f'        code="{code}",')
            description = (
                (data.get("description") or "")
                .replace('"', "`")
                .replace("\n", "\\n")
                .replace("'", "`")
            )
            lines.append(f'        description="""{description}""",')
            lines.append("    )")
            lines.append("")

            # Add properties
            self.add_properties(self.datasets, parent_code, data, lines)
            # Add newline between classes
            lines.append("")

        return "\n".join(lines)

    def generate_object_types(self) -> str:
        """
        Generate Python code for the object types in the Openbis datamodel. The code is generated
        as a string which is then printed out to the specific Python module in `bam_masterdata/datamodel/object_types.py`.

        Returns:
            str: Python code for the object types.
        """
        lines = []
        class_names: dict = {}

        if self.objects != {}:
            # Add imports at the top
            lines.append(
                "from bam_masterdata.metadata.definitions import ObjectTypeDef, PropertyTypeAssignment"
            )
            lines.append("from bam_masterdata.metadata.entities import ObjectType")
            lines.append("")
            lines.append("")

        # Process each object type
        for code, data in self.objects.items():
            # Skip the "UNKNOWN" object type
            if code == "UNKNOWN":
                continue

            # Determine parent class
            parent_code, parent_class, class_name = self.determine_parent_class(
                code=code, class_names=class_names, default="ObjectType", lines=lines
            )

            # Add class definition
            lines.append(f"class {class_name}({parent_class}):")
            lines.append("    defs = ObjectTypeDef(")
            lines.append(f'        code="{code}",')
            description = (
                (data.get("description") or "")
                .replace('"', "`")
                .replace("\n", "\\n")
                .replace("'", "`")
            )
            lines.append(f'        description="""{description}""",')
            lines.append(
                f'        generated_code_prefix="{data.get("generatedCodePrefix", "")}",'
            )
            lines.append("    )")
            lines.append("")

            # Add properties
            self.add_properties(self.objects, parent_code, data, lines)
            # Add newline between classes
            lines.append("")

        return "\n".join(lines)

    def generate_vocabulary_types(self) -> str:
        """
        Generate Python code for the vocabulary types in the Openbis datamodel. The code is generated
        as a string which is then printed out to the specific Python module in `bam_masterdata/datamodel/vocabulary_types.py`.

        Returns:
            str: Python code for the vocabulary types.
        """
        lines = []
        class_names: dict = {}

        if self.vocabularies != {}:
            # Add imports at the top
            lines.append(
                "from bam_masterdata.metadata.definitions import VocabularyTerm, VocabularyTypeDef"
            )
            lines.append("from bam_masterdata.metadata.entities import VocabularyType")
            lines.append("")
            lines.append("")

        # Process each object type
        for code, data in self.vocabularies.items():
            # Skip the "UNKNOWN" object type
            if code == "UNKNOWN":
                continue

            # Determine parent class
            parent_code, parent_class, class_name = self.determine_parent_class(
                code=code,
                class_names=class_names,
                default="VocabularyType",
                lines=lines,
            )

            # Add class definition
            lines.append(f"class {class_name}({parent_class}):")
            lines.append("    defs = VocabularyTypeDef(")
            lines.append(f'        code="{code}",')
            description = (
                (data.get("description") or "")
                .replace('"', "`")
                .replace("\n", "\\n")
                .replace("'", "`")
            )
            lines.append(f'        description="""{description}""",')
            lines.append("    )")
            lines.append("")

            # Add terms
            parent_terms = self.objects.get(parent_code, {}).get("terms", {}).keys()
            for term_code, term_data in data.get("terms", {}).items():
                # Skip "UNKNOWN" properties
                if term_code == "UNKNOWN":
                    continue

                # We check if the term is inherited from the parent class
                if term_code in parent_terms:
                    continue

                term_name = (
                    term_code.lstrip("$").replace(".", "_").replace("-", "_").lower()
                )
                if term_name[0].isdigit():
                    term_name = f"_{term_name}"
                if term_name == "l":
                    term_name = "L"
                if term_name == "O":
                    term_name = "o"
                if term_name == "I":
                    term_name = "i"
                lines.append(f"    {term_name} = VocabularyTerm(")
                lines.append(f'        code="{term_code}",')
                label = (term_data.get("label") or "").replace('"', "")
                lines.append(f'        label="{label}",')
                description = (
                    (term_data.get("description") or "")
                    .replace('"', "`")
                    .replace("\n", "\\n")
                    .replace("'", "`")
                )
                lines.append(f'        description="""{description}""",')
                lines.append("    )")
                lines.append("")

            # Add newline between classes
            lines.append("")

        return "\n".join(lines)

row_cell_info = kwargs.get('row_cell_info', False)

generator_type = 'openbis'

properties = OpenbisEntities(url=url).get_property_dict()

collections = OpenbisEntities(url=url).get_collection_dict()

datasets = OpenbisEntities(url=url).get_dataset_dict()

objects = OpenbisEntities(url=url).get_object_dict()

vocabularies = OpenbisEntities(url=url).get_vocabulary_dict()

__init__(url='', path='', **kwargs)

Source code in bam_masterdata/cli/fill_masterdata.py
def __init__(self, url: str = "", path: str = "", **kwargs):
    start_time = time.time()
    self.row_cell_info = kwargs.get("row_cell_info", False)
    # * This part takes some time due to the loading of all entities from Openbis
    if url:
        self.generator_type = "openbis"
        self.properties = OpenbisEntities(url=url).get_property_dict()
        self.collections = OpenbisEntities(url=url).get_collection_dict()
        self.datasets = OpenbisEntities(url=url).get_dataset_dict()
        self.objects = OpenbisEntities(url=url).get_object_dict()
        self.vocabularies = OpenbisEntities(url=url).get_vocabulary_dict()
        elapsed_time = time.time() - start_time
        click.echo(
            f"Loaded OpenBIS entities in `MasterdataCodeGenerator` initialization {elapsed_time:.2f} seconds\n"
        )
    else:
        self.generator_type = "excel"
        entities_dict = MasterdataExcelExtractor(
            excel_path=path, row_cell_info=self.row_cell_info
        ).excel_to_entities()
        self.properties = entities_dict.get("property_types", {})
        self.collections = entities_dict.get("collection_types", {})
        self.datasets = entities_dict.get("dataset_types", {})
        self.objects = entities_dict.get("object_types", {})
        self.vocabularies = entities_dict.get("vocabulary_types", {})
        elapsed_time = time.time() - start_time
        click.echo(
            f"Loaded Masterdata excel entities in `MasterdataCodeGenerator` initialization {elapsed_time:.2f} seconds\n"
        )

determine_parent_class(code, class_names, default, lines)

Determine the parent class information of the entity based on its code. It returns the parent_code and parent class, as well as the class_name of the entity. The class will inherit from parent_class.

If the parent class does not exist, a note is added to the lines list for debugging purposes.

PARAMETER DESCRIPTION
code

The code of the entity.

TYPE: str

class_names

A dictionary with the class names of the entities.

TYPE: dict

default

The default parent class if the parent class does not exist.

TYPE: str

lines

A list of strings to be printed to the Python module.

TYPE: list

Returns: tuple: The parent code, parent class, and class name of the entity.

Source code in bam_masterdata/cli/fill_masterdata.py
def determine_parent_class(
    self, code: str, class_names: dict, default: str, lines: list
) -> tuple:
    """
    Determine the parent class information of the entity based on its `code`. It returns
    the `parent_code` and `parent class`, as well as the `class_name` of the entity. The
    class will inherit from `parent_class`.

    If the parent class does not exist, a note is added to the `lines` list for debugging purposes.

    Args:
        code (str): The code of the entity.
        class_names (dict): A dictionary with the class names of the entities.
        default (str): The default parent class if the parent class does not exist.
        lines (list): A list of strings to be printed to the Python module.
    Returns:
        tuple: The parent code, parent class, and class name of the entity.
    """
    parent_code = ""
    if "." in code:
        parent_code = code.rsplit(".", 1)[0]
    parent_class = class_names.get(parent_code, default)

    # Format class name
    class_name = code_to_class_name(code)
    class_names[code] = class_name

    # If the parent class does not exist but the `code` shows some inheritance, we add a note for debugging
    if parent_code and parent_class == default:
        lines.append(
            f"# ! The parent class of {class_name} is not defined (missing {parent_class})"
        )

    return parent_code, parent_class, class_name

get_property_object_code(prop_data)

Get the object code (or vocabulary code) used for reference for the assigned property with prop_code.

PARAMETER DESCRIPTION
prop_data

The data information for the property as obtained from openBIS.

TYPE: dict

RETURNS DESCRIPTION
str

The object/vocabulary code used for reference for the assigned property.

TYPE: str

Source code in bam_masterdata/cli/fill_masterdata.py
def get_property_object_code(self, prop_data: dict) -> str:
    """
    Get the object code (or vocabulary code) used for reference for the assigned property with `prop_code`.

    Args:
        prop_data (dict): The data information for the property as obtained from openBIS.

    Returns:
        str: The object/vocabulary code used for reference for the assigned property.
    """
    if not prop_data:
        return ""

    # TODO check excel extractor to add sampleType column
    object_code = prop_data.get("sampleType", "")
    if object_code:
        return object_code

    # TODO fix this patch and avoid using generator type
    vocabulary_code = ""
    if self.generator_type == "openbis":
        vocabulary_code = prop_data.get("vocabulary", "")
    elif self.generator_type == "excel":
        vocabulary_code = prop_data.get("vocabularyType", "")
    return vocabulary_code

add_properties(entities, parent_code, data, lines)

Add the properties of the entity to the lines list. The properties are added as PropertyTypeAssignment objects.

Note: the assigned properties do not have the information of code for the entity when data_type is OBJECT or CONTROLLEDVOCABULARY. These are instead defined in property_types.py.

PARAMETER DESCRIPTION
entities

The dictionary of entities (objects, collections, datasets, vocabularies).

TYPE: dict

parent_code

The code of the parent class.

TYPE: code

data

The data information for the entity as obtained from openBIS.

TYPE: dict

lines

A list of strings to be printed to the Python module.

TYPE: list

Source code in bam_masterdata/cli/fill_masterdata.py
def add_properties(
    self, entities: dict, parent_code: str, data: dict, lines: list
) -> None:
    """
    Add the properties of the entity to the `lines` list. The properties are added as
    `PropertyTypeAssignment` objects.

    Note: the assigned properties do not have the information of `code` for the entity when
    data_type is OBJECT or CONTROLLEDVOCABULARY. These are instead defined in `property_types.py`.

    Args:
        entities (dict): The dictionary of entities (objects, collections, datasets, vocabularies).
        parent_code (code): The code of the parent class.
        data (dict): The data information for the entity as obtained from openBIS.
        lines (list): A list of strings to be printed to the Python module.
    """
    parent_properties_code = (
        entities.get(parent_code, {}).get("properties", {}).keys()
    )
    for prop_code, prop_data in data.get("properties", {}).items():
        # Skip "UNKNOWN" properties
        # We check if the property is inherited from the parent class
        if prop_code == "UNKNOWN" or prop_code in parent_properties_code:
            continue

        prop_name = prop_code.lstrip("$").replace(".", "_").lower()
        lines.append(f"    {prop_name} = PropertyTypeAssignment(")
        lines.append(f'        code="{prop_code}",')
        # ! patching dataType=SAMPLE instead of OBJECT
        data_type = prop_data.get("dataType", "")
        if data_type == "SAMPLE":
            data_type = "OBJECT"
        lines.append(f'        data_type="{data_type}",')
        if data_type == "OBJECT":
            object_code = self.get_property_object_code(prop_data=prop_data)
            if object_code:
                lines.append(f'        object_code="{object_code}",')
        elif data_type == "CONTROLLEDVOCABULARY":
            vocabulary_code = self.get_property_object_code(prop_data=prop_data)
            if vocabulary_code:
                lines.append(f'        vocabulary_code="{vocabulary_code}",')

        property_label = (prop_data.get("label") or "").replace("\n", "\\n")
        lines.append(f'        property_label="{property_label}",')
        description = (
            (prop_data.get("description") or "")
            .replace('"', "`")
            .replace("\n", "\\n")
            .replace("'", "`")
        )
        lines.append(f'        description="""{description}""",')
        lines.append(f"        mandatory={prop_data.get('mandatory', False)},")
        lines.append(
            f"        show_in_edit_views={prop_data.get('show_in_edit_views', False)},"
        )
        section = (
            (prop_data.get("section") or "")
            .replace('"', '\\"')
            .replace("\n", "\\n")
            .replace("'", "\\'")
        )
        lines.append(f'        section="{section}",')
        lines.append("    )")
        lines.append("")

generate_collection_types()

Generate Python code for the collection types in the Openbis datamodel. The code is generated as a string which is then printed out to the specific Python module in bam_masterdata/datamodel/collection_types.py.

RETURNS DESCRIPTION
str

Python code for the collection types.

TYPE: str

Source code in bam_masterdata/cli/fill_masterdata.py
def generate_collection_types(self) -> str:
    """
    Generate Python code for the collection types in the Openbis datamodel. The code is generated
    as a string which is then printed out to the specific Python module in `bam_masterdata/datamodel/collection_types.py`.

    Returns:
        str: Python code for the collection types.
    """
    lines = []
    class_names: dict = {}
    # from bam_masterdata.metadata.definitions import (
    #     CollectionTypeDef,
    #     PropertyTypeAssignment,
    # )
    if self.collections != {}:
        # Add imports at the top
        lines.append("from bam_masterdata.metadata.definitions import (")
        lines.append("    CollectionTypeDef,")
        lines.append("    PropertyTypeAssignment,")
        lines.append(")")
        lines.append("from bam_masterdata.metadata.entities import CollectionType")
        lines.append("")
        lines.append("")

    # Process each collection type
    for code, data in self.collections.items():
        # Skip the "UNKNOWN" object type
        if code == "UNKNOWN":
            continue

        # Determine parent class
        parent_code, parent_class, class_name = self.determine_parent_class(
            code=code,
            class_names=class_names,
            default="CollectionType",
            lines=lines,
        )

        # Add class definition
        lines.append(f"class {class_name}({parent_class}):")
        lines.append("    defs = CollectionTypeDef(")
        lines.append(f'        code="{code}",')
        description = (
            (data.get("description") or "")
            .replace('"', "`")
            .replace("\n", "\\n")
            .replace("'", "`")
        )
        lines.append(f'        description="""{description}""",')
        if data.get("validationPlugin") != "":
            lines.append(
                f'        validation_script="{data.get("validationPlugin")}",'
            )
        lines.append("    )")
        lines.append("")

        # Add properties
        self.add_properties(self.collections, parent_code, data, lines)
        # Add newline between classes
        lines.append("")

    return "\n".join(lines)

generate_dataset_types()

Generate Python code for the dataset types in the Openbis datamodel. The code is generated as a string which is then printed out to the specific Python module in bam_masterdata/datamodel/dataset_types.py.

RETURNS DESCRIPTION
str

Python code for the dataset types.

TYPE: str

Source code in bam_masterdata/cli/fill_masterdata.py
def generate_dataset_types(self) -> str:
    """
    Generate Python code for the dataset types in the Openbis datamodel. The code is generated
    as a string which is then printed out to the specific Python module in `bam_masterdata/datamodel/dataset_types.py`.

    Returns:
        str: Python code for the dataset types.
    """
    lines = []
    class_names: dict = {}

    if self.datasets != {}:
        # Add imports at the top
        lines.append(
            "from bam_masterdata.metadata.definitions import DatasetTypeDef, PropertyTypeAssignment"
        )
        lines.append("from bam_masterdata.metadata.entities import DatasetType")
        lines.append("")
        lines.append("")

    # Process each dataset type
    for code, data in self.datasets.items():
        # Skip the "UNKNOWN" object type
        if code == "UNKNOWN":
            continue

        # Determine parent class
        parent_code, parent_class, class_name = self.determine_parent_class(
            code=code, class_names=class_names, default="DatasetType", lines=lines
        )

        # Add class definition
        lines.append(f"class {class_name}({parent_class}):")
        lines.append("    defs = DatasetTypeDef(")
        lines.append(f'        code="{code}",')
        description = (
            (data.get("description") or "")
            .replace('"', "`")
            .replace("\n", "\\n")
            .replace("'", "`")
        )
        lines.append(f'        description="""{description}""",')
        lines.append("    )")
        lines.append("")

        # Add properties
        self.add_properties(self.datasets, parent_code, data, lines)
        # Add newline between classes
        lines.append("")

    return "\n".join(lines)

generate_object_types()

Generate Python code for the object types in the Openbis datamodel. The code is generated as a string which is then printed out to the specific Python module in bam_masterdata/datamodel/object_types.py.

RETURNS DESCRIPTION
str

Python code for the object types.

TYPE: str

Source code in bam_masterdata/cli/fill_masterdata.py
def generate_object_types(self) -> str:
    """
    Generate Python code for the object types in the Openbis datamodel. The code is generated
    as a string which is then printed out to the specific Python module in `bam_masterdata/datamodel/object_types.py`.

    Returns:
        str: Python code for the object types.
    """
    lines = []
    class_names: dict = {}

    if self.objects != {}:
        # Add imports at the top
        lines.append(
            "from bam_masterdata.metadata.definitions import ObjectTypeDef, PropertyTypeAssignment"
        )
        lines.append("from bam_masterdata.metadata.entities import ObjectType")
        lines.append("")
        lines.append("")

    # Process each object type
    for code, data in self.objects.items():
        # Skip the "UNKNOWN" object type
        if code == "UNKNOWN":
            continue

        # Determine parent class
        parent_code, parent_class, class_name = self.determine_parent_class(
            code=code, class_names=class_names, default="ObjectType", lines=lines
        )

        # Add class definition
        lines.append(f"class {class_name}({parent_class}):")
        lines.append("    defs = ObjectTypeDef(")
        lines.append(f'        code="{code}",')
        description = (
            (data.get("description") or "")
            .replace('"', "`")
            .replace("\n", "\\n")
            .replace("'", "`")
        )
        lines.append(f'        description="""{description}""",')
        lines.append(
            f'        generated_code_prefix="{data.get("generatedCodePrefix", "")}",'
        )
        lines.append("    )")
        lines.append("")

        # Add properties
        self.add_properties(self.objects, parent_code, data, lines)
        # Add newline between classes
        lines.append("")

    return "\n".join(lines)

generate_vocabulary_types()

Generate Python code for the vocabulary types in the Openbis datamodel. The code is generated as a string which is then printed out to the specific Python module in bam_masterdata/datamodel/vocabulary_types.py.

RETURNS DESCRIPTION
str

Python code for the vocabulary types.

TYPE: str

Source code in bam_masterdata/cli/fill_masterdata.py
def generate_vocabulary_types(self) -> str:
    """
    Generate Python code for the vocabulary types in the Openbis datamodel. The code is generated
    as a string which is then printed out to the specific Python module in `bam_masterdata/datamodel/vocabulary_types.py`.

    Returns:
        str: Python code for the vocabulary types.
    """
    lines = []
    class_names: dict = {}

    if self.vocabularies != {}:
        # Add imports at the top
        lines.append(
            "from bam_masterdata.metadata.definitions import VocabularyTerm, VocabularyTypeDef"
        )
        lines.append("from bam_masterdata.metadata.entities import VocabularyType")
        lines.append("")
        lines.append("")

    # Process each object type
    for code, data in self.vocabularies.items():
        # Skip the "UNKNOWN" object type
        if code == "UNKNOWN":
            continue

        # Determine parent class
        parent_code, parent_class, class_name = self.determine_parent_class(
            code=code,
            class_names=class_names,
            default="VocabularyType",
            lines=lines,
        )

        # Add class definition
        lines.append(f"class {class_name}({parent_class}):")
        lines.append("    defs = VocabularyTypeDef(")
        lines.append(f'        code="{code}",')
        description = (
            (data.get("description") or "")
            .replace('"', "`")
            .replace("\n", "\\n")
            .replace("'", "`")
        )
        lines.append(f'        description="""{description}""",')
        lines.append("    )")
        lines.append("")

        # Add terms
        parent_terms = self.objects.get(parent_code, {}).get("terms", {}).keys()
        for term_code, term_data in data.get("terms", {}).items():
            # Skip "UNKNOWN" properties
            if term_code == "UNKNOWN":
                continue

            # We check if the term is inherited from the parent class
            if term_code in parent_terms:
                continue

            term_name = (
                term_code.lstrip("$").replace(".", "_").replace("-", "_").lower()
            )
            if term_name[0].isdigit():
                term_name = f"_{term_name}"
            if term_name == "l":
                term_name = "L"
            if term_name == "O":
                term_name = "o"
            if term_name == "I":
                term_name = "i"
            lines.append(f"    {term_name} = VocabularyTerm(")
            lines.append(f'        code="{term_code}",')
            label = (term_data.get("label") or "").replace('"', "")
            lines.append(f'        label="{label}",')
            description = (
                (term_data.get("description") or "")
                .replace('"', "`")
                .replace("\n", "\\n")
                .replace("'", "`")
            )
            lines.append(f'        description="""{description}""",')
            lines.append("    )")
            lines.append("")

        # Add newline between classes
        lines.append("")

    return "\n".join(lines)

bam_masterdata.cli.excel_to_entities

MasterdataExcelExtractor

Source code in bam_masterdata/cli/excel_to_entities.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
class MasterdataExcelExtractor:
    # TODO move these validation rules to a separate json
    VALIDATION_RULES: dict[str, dict[str, dict[str, Any]]] = {}

    def __init__(self, excel_path: str, **kwargs):
        """Initialize the MasterdataExtractor."""
        self.excel_path = excel_path
        self.row_cell_info = kwargs.get("row_cell_info", False)
        self.workbook = openpyxl.load_workbook(excel_path)
        self.logger = kwargs.get("logger", logger)

        # Load validation rules at initialization
        if not MasterdataExcelExtractor.VALIDATION_RULES:
            self.VALIDATION_RULES = load_validation_rules(
                self.logger,
                os.path.join(VALIDATION_RULES_DIR, "excel_validation_rules.json"),
            )

    def index_to_excel_column(self, index: int) -> str:
        """
        Converts a 1-based index to an Excel column name.

        Args:
            index: The 1-based index to convert.

        Returns:
            The corresponding Excel column name.
        """
        if not index >= 1:
            raise ValueError("Index must be a positive integer starting from 1.")

        column = ""
        while index > 0:
            index, remainder = divmod(index - 1, 26)
            column = chr(65 + remainder) + column
        return column

    def get_last_non_empty_row(
        self, sheet: "Worksheet", start_index: int
    ) -> int | None:
        """
        Finds the last non-empty row before encountering a completely empty row.

        Args:
            sheet: The worksheet object.
            start_index: The row number to start checking from (1-based index).

        Returns:
            The row number of the last non-empty row before an empty row is encountered,
            or None if no non-empty rows are found starting from the given index.
        """
        if start_index < 1 or start_index > sheet.max_row:
            raise ValueError(
                f"Invalid start index: {start_index}. It must be between 1 and {sheet.max_row}."
            )

        last_non_empty_row = None
        for row in range(start_index, sheet.max_row + 1):
            if all(
                sheet.cell(row=row, column=col).value in (None, "")
                for col in range(1, sheet.max_column + 1)
            ):
                return last_non_empty_row  # Return the last non-empty row before the current empty row

            last_non_empty_row = row  # Update the last non-empty row

        return last_non_empty_row  # If no empty row is encountered, return the last non-empty row

    def str_to_bool(
        self,
        value: str | bool | None,
        term: str,
        coordinate: str,
        sheet_title: str,
    ) -> bool:
        """
        Converts a string to a boolean value.

        Args:
            value: The string to convert.

        Returns:
            The boolean value.
        """
        # No `value` provided
        if not value:
            return False

        val = str(value).strip().lower()
        if val not in ["true", "false"]:
            self.logger.error(
                f"Invalid {term.lower()} value found in the {term} column at position {coordinate} in {sheet_title}. Accepted values: TRUE or FALSE.",
                term=term,
                cell_value=val,
                cell_coordinate=coordinate,
                sheet_title=sheet_title,
            )
        return val == "true"

    def get_and_check_property(
        self,
        value: str | bool | None,
        term: str,
        coordinate: str,
        sheet_title: str,
        is_description: bool = False,
        is_code: bool = False,
        is_data: bool = False,
        is_url: bool = False,
    ) -> str:
        """
        Gets a property and checks its format.

        Args:
            value: The string to convert.

        Returns:
            The property.
        """

        # No `value` provided
        if not value:
            return ""

        val = str(value)
        error_message = f"Invalid {term.lower()} value found in the {term} column at position {coordinate} in {sheet_title}."
        if is_description:
            if not re.match(r".*//.*", val):
                self.logger.error(
                    error_message
                    + "Description should follow the schema: English Description + '//' + German Description. ",
                    term=term,
                    cell_value=val,
                    cell_coordinate=coordinate,
                    sheet_title=sheet_title,
                )
        elif is_code:
            if not re.match(r"^\$?[A-Z0-9_.]+$", val):
                self.logger.error(
                    error_message,
                    term=term,
                    cell_value=val,
                    cell_coordinate=coordinate,
                    sheet_title=sheet_title,
                )
        elif is_data:
            if val not in [dt.value for dt in DataType]:
                self.logger.error(
                    error_message
                    + f"The Data Type should be one of the following: {[dt.value for dt in DataType]}",
                    term=term,
                    cell_value=val,
                    cell_coordinate=coordinate,
                    sheet_title=sheet_title,
                )
        elif is_url:
            if not re.match(
                r"https?://(?:www\.)?[a-zA-Z0-9-._~:/?#@!$&'()*+,;=%]+", val
            ):
                self.logger.error(
                    error_message,
                    term=term,
                    cell_value=val,
                    cell_coordinate=coordinate,
                    sheet_title=sheet_title,
                )
        else:
            if not re.match(r".*", val):
                self.logger.error(
                    error_message,
                    term=term,
                    cell_value=val,
                    cell_coordinate=coordinate,
                    sheet_title=sheet_title,
                )
        return val

    # Helper function to process each term
    def process_term(
        self, term: str, cell_value: Any, coordinate: str, sheet_title: str
    ) -> Any:
        """
        Processes a term by converting it to a boolean if necessary or checking its validity.

        Args:
            term: The term being processed.
            cell_value: The value of the cell.
            coordinate: The coordinate of the cell in the sheet.
            sheet_title: The title of the sheet.

        Returns:
            The processed value, either as a boolean or the original value after validation.
        """
        # Check if the term is a boolean type
        if term in ("Mandatory", "Show in edit views"):
            return self.str_to_bool(
                value=cell_value,
                term=term,
                coordinate=coordinate,
                sheet_title=sheet_title,
            )
        # Check and validate the property
        return self.get_and_check_property(
            value=cell_value,
            term=term,
            coordinate=coordinate,
            sheet_title=sheet_title,
            is_code=(term in ["Code", "Vocabulary code"]),
            is_data=(term == "Data type"),
        )

    def extract_value(
        self,
        sheet: "Worksheet",
        row: int,
        column: int,
        validation_pattern: str = None,
        is_description: bool = False,
        is_data: bool = False,
        is_url: bool = False,
    ) -> str:
        """
        Extracts and validates a value from a specified cell in the Excel sheet.

        Args:
            sheet: The worksheet object.
            row: The row number of the cell (1-based index).
            column: The column number of the cell (1-based index).
            validation_pattern: Optional regex pattern to validate the cell value.
            is_description: Flag indicating if the value is a description.
            is_data: Flag indicating if the value is a data type.
            is_url: Flag indicating if the value is a URL.

        Returns:
            The extracted and validated cell value as a string. Returns an empty string if the value is invalid or not provided.
        """
        value = sheet.cell(row=row, column=column).value

        # No `value` provided
        if not value:
            return ""

        validated = (
            bool(re.match(validation_pattern, str(value)))
            if validation_pattern
            else True
        )
        error_message = f"Invalid value '{value}' at row {row}, column {column} in sheet {sheet.title}"

        if is_description:
            error_message += " Description should follow the schema: English Description + '//' + German Description."
        elif is_data:
            validated = str(value) in [dt.value for dt in DataType]
            error_message += f" The Data Type should be one of the following: {[dt.value for dt in DataType]}"
        elif is_url:
            error_message += " It should be an URL or empty"

        if not validated:
            self.logger.error(
                error_message,
                cell_value=value,
                sheet_title=sheet.title,
                row=row,
                column=column,
            )

        return value or ""

    def process_entity(
        self,
        sheet: "Worksheet",
        start_index_row: int,
        header_terms: list[str],
        expected_terms: list[str],
        entity_type: str,
    ) -> dict[str, Any]:
        """
        Process an entity type block in the Excel sheet and return its attributes as a dictionary.

        Args:
            sheet: The worksheet object.
            start_index_row: The row where the current entity type begins (1-based index).
            header_terms: List of header terms in the entity block.
            expected_terms: List of expected terms to extract from the entity block.
            entity_type: The type of the entity (e.g., SAMPLE_TYPE, OBJECT_TYPE).

        Returns:
            A dictionary containing the attributes of the entity.
        """
        attributes: dict = {}
        cell_value: Any = ""

        for term in expected_terms:
            if term not in header_terms:
                self.logger.error(f"{term} not found in the headers.", term=term)
            else:
                term_index = header_terms.index(term)
                cell = sheet.cell(row=start_index_row + 2, column=term_index + 1)
                cell_value = self.extract_value(
                    sheet,
                    start_index_row + 2,
                    term_index + 1,
                    self.VALIDATION_RULES[entity_type][term].get("pattern"),
                )

                # Handle boolean conversion
                if self.VALIDATION_RULES[entity_type][term].get("is_bool"):
                    cell_value = self.str_to_bool(
                        value=cell_value,
                        term=term,
                        coordinate=cell.coordinate,
                        sheet_title=sheet.title,
                    )

                # Handle data type validation
                elif self.VALIDATION_RULES[entity_type][term].get("is_data"):
                    if cell_value not in [dt.value for dt in DataType]:
                        self.logger.error(
                            f"Invalid Data Type: {cell_value} in {cell.coordinate} (Sheet: {sheet.title}). Should be one of the following: {[dt.value for dt in DataType]}",
                            term=term,
                            cell_value=cell_value,
                            cell_coordinate=cell.coordinate,
                            sheet_title=sheet.title,
                        )

                # Handle additional validation for "Generated code prefix"
                elif (
                    self.VALIDATION_RULES[entity_type][term].get("extra_validation")
                    == "is_reduced_version"
                ):
                    if not is_reduced_version(cell_value, attributes.get("code", "")):
                        self.logger.warning(
                            f"Invalid {term} value '{cell_value}' in {cell.coordinate} (Sheet: {sheet.title}). "
                            f"Generated code prefix should be part of the 'Code' {attributes.get('code', '')}.",
                            term=term,
                            cell_value=cell_value,
                            cell_coordinate=cell.coordinate,
                            sheet_title=sheet.title,
                        )

                # Handle validation script (allows empty but must match pattern if provided)
                elif (
                    self.VALIDATION_RULES[entity_type][term].get("allow_empty")
                    and not cell_value
                ):
                    cell_value = None

                # Handle URL template validation (allows empty but must be a valid URL)
                elif (
                    self.VALIDATION_RULES[entity_type][term].get("is_url")
                    and cell_value
                ):
                    url_pattern = self.VALIDATION_RULES[entity_type][term].get(
                        "pattern"
                    )
                    if not re.match(url_pattern, str(cell_value)):
                        self.logger.error(
                            f"Invalid URL format: {cell_value} in {cell.coordinate} (Sheet: {sheet.title})",
                            cell_value=cell_value,
                            cell_coordinate=cell.coordinate,
                            sheet_title=sheet.title,
                        )

                # Add the extracted value to the attributes dictionary
                attributes[self.VALIDATION_RULES[entity_type][term].get("key")] = (
                    cell_value
                )

        if self.row_cell_info:
            attributes["row_location"] = f"A{start_index_row}"
        return attributes

    def properties_to_dict(
        self, sheet: "Worksheet", start_index_row: int, last_non_empty_row: int
    ) -> dict[str, dict[str, Any]]:
        """
        Extracts properties from an Entity type block in the Excel sheet and returns them as a dictionary.

        Args:
            sheet: The worksheet object.
            start_index_row: Row where the current entity type begins (1-based index).
            last_non_empty_row: Row where the current entity type finish (1-based index).

        Returns:
            A dictionary where each key is a property code and the value is a dictionary
            containing the attributes of the property.
        """
        property_dict: dict = {}
        expected_terms = [
            "Code",
            "Description",
            "Mandatory",
            "Show in edit views",
            "Section",
            "Property label",
            "Data type",
            "Vocabulary code",
            "Metadata",
            "Dynamic script",
            # ! these are not used
            # "Unique",
            # "Internal assignment",
        ]

        # Determine the header row index
        header_index = start_index_row + 3
        row_headers = [(cell.value, cell.coordinate) for cell in sheet[header_index]]
        # And store how many properties are for the entity
        n_properties = last_non_empty_row - header_index
        if n_properties < 0:
            self.logger.error(
                f"No properties found for the entity in sheet {sheet.title} starting at row {start_index_row}."
            )
            return property_dict

        # Initialize a dictionary to store extracted columns
        extracted_columns: dict[str, list] = {term: [] for term in expected_terms}
        if self.row_cell_info:
            extracted_columns["row_location"] = []

        # Extract columns for each expected term
        for i, (term, coordinate) in enumerate(row_headers):
            if term not in expected_terms:
                log_func = (
                    self.logger.warning
                    if term
                    in (
                        "Mandatory",
                        "Show in edit views",
                        "Section",
                        "Metadata",
                        "Dynamic script",
                        "Vocabulary code",
                        # ! these are not used
                        # "Unique",
                        # "Internal assignment",
                    )
                    else self.logger.error
                )
                log_func(f"'{term}' not found in the properties headers.", term=term)
                continue

            # Excel column letter from the coordinate
            term_letter = coordinate[0]

            # Extract values from the column
            for cell_property in sheet[term_letter][header_index:last_non_empty_row]:
                extracted_columns[term].append(
                    self.process_term(
                        term, cell_property.value, cell_property.coordinate, sheet.title
                    )
                )
                if self.row_cell_info:
                    extracted_columns["row_location"].append(cell_property.coordinate)

        # Combine extracted values into a dictionary
        for i in range(n_properties):
            code = extracted_columns.get("Code", [])
            if not code:
                self.logger.error(
                    f"'Code' not found in the properties headers for sheet {sheet.title}."
                )
                return property_dict
            code = code[i]
            property_dict[code] = {"permId": code, "code": code}
            for key, pybis_val in {
                "Description": "description",
                "Section": "section",
                "Mandatory": "mandatory",
                "Show in edit views": "show_in_edit_views",
                "Property label": "label",
                "Data type": "dataType",
                "Vocabulary code": "vocabularyCode",
            }.items():
                data_column = extracted_columns.get(key, [])
                if not data_column:
                    continue
                property_dict[code][pybis_val] = data_column[i]
            if self.row_cell_info:
                property_dict[code]["row_location"] = (
                    extracted_columns.get("row_location")[i],
                )
            # Only add optional fields if they exist in extracted_columns
            optional_fields = [
                "Metadata",
                "Dynamic script",
                "Unique",
                "Internal assignment",
            ]
            for field in optional_fields:
                if (
                    field in extracted_columns
                ):  # Check if the field exists in the extracted columns
                    if extracted_columns[field][i] == "":
                        extracted_columns[field][i] = None
                    property_dict[extracted_columns["Code"][i]][
                        field.lower().replace(" ", "_")
                    ] = extracted_columns[field][i]

        return property_dict

    def terms_to_dict(
        self, sheet: "Worksheet", start_index_row: int, last_non_empty_row: int
    ) -> dict[str, dict[str, Any]]:
        """
        Extracts terms from a Vocabulary block in the Excel sheet and returns them as a dictionary.

        Args:
            sheet: The worksheet object.
            start_index_row: Row where the current entity type begins (1-based index).
            last_non_empty_row: Row where the current entity type finish (1-based index).

        Returns:
            A dictionary where each key is a vocabulary term code and the value is a dictionary
            containing the attributes of the vocabulary term.
        """
        terms_dict = {}
        expected_terms = ["Code", "Description", "Url template", "Label", "Official"]

        header_index = start_index_row + 3
        row_headers = [cell.value for cell in sheet[header_index]]

        # Initialize a dictionary to store extracted columns
        extracted_columns: dict[str, list] = {term: [] for term in expected_terms}

        # Helper function to process each term
        def process_term_cell(term, cell_value, coordinate, sheet_title):
            if term == "Official":
                return self.str_to_bool(
                    value=cell_value,
                    term=term,
                    coordinate=coordinate,
                    sheet_title=sheet_title,
                )
            return self.get_and_check_property(
                value=cell_value,
                term=term,
                coordinate=coordinate,
                sheet_title=sheet_title,
                is_code=(term == "Code"),
                is_url=(term == "Url template"),
            )

        # Extract columns for each expected term
        for term in expected_terms:
            if term not in row_headers:
                self.logger.warning(
                    f"{term} not found in the properties headers.", term=term
                )
                continue

            # Get column index and Excel letter
            term_index = row_headers.index(term) + 1
            term_letter = self.index_to_excel_column(term_index)

            # Extract values from the column
            for cell in sheet[term_letter][header_index:last_non_empty_row]:
                extracted_columns[term].append(
                    process_term_cell(term, cell.value, cell.coordinate, sheet.title)
                )

        # Combine extracted values into a dictionary
        for i in range(len(extracted_columns["Code"])):
            terms_dict[extracted_columns["Code"][i]] = {
                "permId": extracted_columns["Code"][i],
                "code": extracted_columns["Code"][i],
            }
            for attr_key in ["Description", "Url template", "Label", "Official"]:
                if extracted_columns.get(attr_key):
                    value = extracted_columns[attr_key][i]
                    terms_dict[extracted_columns["Code"][i]][attr_key] = value

        return terms_dict

    def block_to_entity_dict(
        self,
        sheet: "Worksheet",
        start_index_row: int,
        last_non_empty_row: int,
        complete_dict: dict[str, Any],
    ) -> dict[str, Any]:
        """
        Extracts entity attributes from an Excel sheet block and returns them as a dictionary.
        """
        attributes_dict: dict = {}

        # Get the entity type
        entity_type = sheet[f"A{start_index_row}"].value
        if entity_type not in self.VALIDATION_RULES:
            raise ValueError(f"Invalid entity type: {entity_type}")

        # Get the header terms
        header_terms = [cell.value for cell in sheet[start_index_row + 1]]

        # Process entity data using the helper function
        attributes_dict = self.process_entity(
            sheet,
            start_index_row,
            header_terms,
            list(self.VALIDATION_RULES[entity_type].keys()),
            entity_type,
        )

        # Extract additional attributes if necessary
        if entity_type in {
            "SAMPLE_TYPE",
            "OBJECT_TYPE",
            "EXPERIMENT_TYPE",
            "DATASET_TYPE",
        }:
            attributes_dict["properties"] = (
                self.properties_to_dict(sheet, start_index_row, last_non_empty_row)
                or {}
            )

        elif entity_type == "VOCABULARY_TYPE":
            attributes_dict["terms"] = (
                self.terms_to_dict(sheet, start_index_row, last_non_empty_row) or {}
            )

        # Add the entity to the complete dictionary
        complete_dict[attributes_dict["code"]] = attributes_dict

        # Return sorted dictionary
        return dict(sorted(complete_dict.items(), key=lambda item: item[0].count(".")))

    def excel_to_entities(self) -> dict[str, dict[str, Any]]:
        """
        Extracts entities from an Excel file and returns them as a dictionary.

        Returns:
            dict[str, dict[str, Any]]: A dictionary where each key is a normalized sheet name and the value is a dictionary
            containing the extracted entities. Returns an empty dictionary if all sheets are empty.
        """
        sheets_dict: dict[str, dict[str, Any]] = {}
        sheet_names = self.workbook.sheetnames
        has_content = False  # Track if any sheet has valid content

        for i, sheet_name in enumerate(sheet_names):
            normalized_sheet_name = sheet_name.lower().replace(" ", "_")
            sheet = self.workbook[sheet_name]
            start_row = 1

            # **Check if the sheet is empty**
            if all(
                sheet.cell(row=row, column=col).value in (None, "")
                for row in range(1, sheet.max_row + 1)
                for col in range(1, sheet.max_column + 1)
            ):
                self.logger.info(f"Skipping empty sheet: {sheet_name}")
                continue  # Move to the next sheet

            sheets_dict[normalized_sheet_name] = {}

            consecutive_empty_rows = 0  # Track consecutive empty rows
            while start_row <= sheet.max_row:
                # **Check for two consecutive empty rows**
                is_row_empty = all(
                    sheet.cell(row=start_row, column=col).value in (None, "")
                    for col in range(1, sheet.max_column + 1)
                )

                if is_row_empty:
                    consecutive_empty_rows += 1
                    if consecutive_empty_rows >= 2:
                        # **Reached the end of the sheet, move to the next**
                        if i == len(sheet_names) - 1:
                            self.logger.info(
                                f"Last sheet {sheet_name} processed. End of the file reached."
                            )
                        else:
                            self.logger.info(
                                f"End of the current sheet {sheet_name} reached. Switching to next sheet..."
                            )
                        break  # Stop processing this sheet
                else:
                    consecutive_empty_rows = 0  # Reset if we find a non-empty row

                    # **Process the entity block**
                    last_non_empty_row = self.get_last_non_empty_row(sheet, start_row)
                    if last_non_empty_row is None:
                        break  # No more valid blocks

                    sheets_dict[normalized_sheet_name] = self.block_to_entity_dict(
                        sheet,
                        start_row,
                        last_non_empty_row,
                        sheets_dict[normalized_sheet_name],
                    )
                    has_content = True  # Found valid content

                    # Move to the next entity block
                    start_row = last_non_empty_row + 1
                    continue  # Continue loop without increasing consecutive_empty_rows

                start_row += 1  # Move to the next row

        # **If no sheets had content, return an empty dictionary**
        if not has_content:
            self.logger.warning(
                "No valid data found in any sheets. Returning empty dictionary."
            )
            return {}

        return sheets_dict

VALIDATION_RULES = {}

excel_path = excel_path

row_cell_info = kwargs.get('row_cell_info', False)

workbook = openpyxl.load_workbook(excel_path)

logger = kwargs.get('logger', logger)

__init__(excel_path, **kwargs)

Initialize the MasterdataExtractor.

Source code in bam_masterdata/cli/excel_to_entities.py
def __init__(self, excel_path: str, **kwargs):
    """Initialize the MasterdataExtractor."""
    self.excel_path = excel_path
    self.row_cell_info = kwargs.get("row_cell_info", False)
    self.workbook = openpyxl.load_workbook(excel_path)
    self.logger = kwargs.get("logger", logger)

    # Load validation rules at initialization
    if not MasterdataExcelExtractor.VALIDATION_RULES:
        self.VALIDATION_RULES = load_validation_rules(
            self.logger,
            os.path.join(VALIDATION_RULES_DIR, "excel_validation_rules.json"),
        )

index_to_excel_column(index)

Converts a 1-based index to an Excel column name.

PARAMETER DESCRIPTION
index

The 1-based index to convert.

TYPE: int

RETURNS DESCRIPTION
str

The corresponding Excel column name.

Source code in bam_masterdata/cli/excel_to_entities.py
def index_to_excel_column(self, index: int) -> str:
    """
    Converts a 1-based index to an Excel column name.

    Args:
        index: The 1-based index to convert.

    Returns:
        The corresponding Excel column name.
    """
    if not index >= 1:
        raise ValueError("Index must be a positive integer starting from 1.")

    column = ""
    while index > 0:
        index, remainder = divmod(index - 1, 26)
        column = chr(65 + remainder) + column
    return column

get_last_non_empty_row(sheet, start_index)

Finds the last non-empty row before encountering a completely empty row.

PARAMETER DESCRIPTION
sheet

The worksheet object.

TYPE: Worksheet

start_index

The row number to start checking from (1-based index).

TYPE: int

RETURNS DESCRIPTION
int | None

The row number of the last non-empty row before an empty row is encountered,

int | None

or None if no non-empty rows are found starting from the given index.

Source code in bam_masterdata/cli/excel_to_entities.py
def get_last_non_empty_row(
    self, sheet: "Worksheet", start_index: int
) -> int | None:
    """
    Finds the last non-empty row before encountering a completely empty row.

    Args:
        sheet: The worksheet object.
        start_index: The row number to start checking from (1-based index).

    Returns:
        The row number of the last non-empty row before an empty row is encountered,
        or None if no non-empty rows are found starting from the given index.
    """
    if start_index < 1 or start_index > sheet.max_row:
        raise ValueError(
            f"Invalid start index: {start_index}. It must be between 1 and {sheet.max_row}."
        )

    last_non_empty_row = None
    for row in range(start_index, sheet.max_row + 1):
        if all(
            sheet.cell(row=row, column=col).value in (None, "")
            for col in range(1, sheet.max_column + 1)
        ):
            return last_non_empty_row  # Return the last non-empty row before the current empty row

        last_non_empty_row = row  # Update the last non-empty row

    return last_non_empty_row  # If no empty row is encountered, return the last non-empty row

str_to_bool(value, term, coordinate, sheet_title)

Converts a string to a boolean value.

PARAMETER DESCRIPTION
value

The string to convert.

TYPE: str | bool | None

RETURNS DESCRIPTION
bool

The boolean value.

Source code in bam_masterdata/cli/excel_to_entities.py
def str_to_bool(
    self,
    value: str | bool | None,
    term: str,
    coordinate: str,
    sheet_title: str,
) -> bool:
    """
    Converts a string to a boolean value.

    Args:
        value: The string to convert.

    Returns:
        The boolean value.
    """
    # No `value` provided
    if not value:
        return False

    val = str(value).strip().lower()
    if val not in ["true", "false"]:
        self.logger.error(
            f"Invalid {term.lower()} value found in the {term} column at position {coordinate} in {sheet_title}. Accepted values: TRUE or FALSE.",
            term=term,
            cell_value=val,
            cell_coordinate=coordinate,
            sheet_title=sheet_title,
        )
    return val == "true"

get_and_check_property(value, term, coordinate, sheet_title, is_description=False, is_code=False, is_data=False, is_url=False)

Gets a property and checks its format.

PARAMETER DESCRIPTION
value

The string to convert.

TYPE: str | bool | None

RETURNS DESCRIPTION
str

The property.

Source code in bam_masterdata/cli/excel_to_entities.py
def get_and_check_property(
    self,
    value: str | bool | None,
    term: str,
    coordinate: str,
    sheet_title: str,
    is_description: bool = False,
    is_code: bool = False,
    is_data: bool = False,
    is_url: bool = False,
) -> str:
    """
    Gets a property and checks its format.

    Args:
        value: The string to convert.

    Returns:
        The property.
    """

    # No `value` provided
    if not value:
        return ""

    val = str(value)
    error_message = f"Invalid {term.lower()} value found in the {term} column at position {coordinate} in {sheet_title}."
    if is_description:
        if not re.match(r".*//.*", val):
            self.logger.error(
                error_message
                + "Description should follow the schema: English Description + '//' + German Description. ",
                term=term,
                cell_value=val,
                cell_coordinate=coordinate,
                sheet_title=sheet_title,
            )
    elif is_code:
        if not re.match(r"^\$?[A-Z0-9_.]+$", val):
            self.logger.error(
                error_message,
                term=term,
                cell_value=val,
                cell_coordinate=coordinate,
                sheet_title=sheet_title,
            )
    elif is_data:
        if val not in [dt.value for dt in DataType]:
            self.logger.error(
                error_message
                + f"The Data Type should be one of the following: {[dt.value for dt in DataType]}",
                term=term,
                cell_value=val,
                cell_coordinate=coordinate,
                sheet_title=sheet_title,
            )
    elif is_url:
        if not re.match(
            r"https?://(?:www\.)?[a-zA-Z0-9-._~:/?#@!$&'()*+,;=%]+", val
        ):
            self.logger.error(
                error_message,
                term=term,
                cell_value=val,
                cell_coordinate=coordinate,
                sheet_title=sheet_title,
            )
    else:
        if not re.match(r".*", val):
            self.logger.error(
                error_message,
                term=term,
                cell_value=val,
                cell_coordinate=coordinate,
                sheet_title=sheet_title,
            )
    return val

process_term(term, cell_value, coordinate, sheet_title)

Processes a term by converting it to a boolean if necessary or checking its validity.

PARAMETER DESCRIPTION
term

The term being processed.

TYPE: str

cell_value

The value of the cell.

TYPE: Any

coordinate

The coordinate of the cell in the sheet.

TYPE: str

sheet_title

The title of the sheet.

TYPE: str

RETURNS DESCRIPTION
Any

The processed value, either as a boolean or the original value after validation.

Source code in bam_masterdata/cli/excel_to_entities.py
def process_term(
    self, term: str, cell_value: Any, coordinate: str, sheet_title: str
) -> Any:
    """
    Processes a term by converting it to a boolean if necessary or checking its validity.

    Args:
        term: The term being processed.
        cell_value: The value of the cell.
        coordinate: The coordinate of the cell in the sheet.
        sheet_title: The title of the sheet.

    Returns:
        The processed value, either as a boolean or the original value after validation.
    """
    # Check if the term is a boolean type
    if term in ("Mandatory", "Show in edit views"):
        return self.str_to_bool(
            value=cell_value,
            term=term,
            coordinate=coordinate,
            sheet_title=sheet_title,
        )
    # Check and validate the property
    return self.get_and_check_property(
        value=cell_value,
        term=term,
        coordinate=coordinate,
        sheet_title=sheet_title,
        is_code=(term in ["Code", "Vocabulary code"]),
        is_data=(term == "Data type"),
    )

extract_value(sheet, row, column, validation_pattern=None, is_description=False, is_data=False, is_url=False)

Extracts and validates a value from a specified cell in the Excel sheet.

PARAMETER DESCRIPTION
sheet

The worksheet object.

TYPE: Worksheet

row

The row number of the cell (1-based index).

TYPE: int

column

The column number of the cell (1-based index).

TYPE: int

validation_pattern

Optional regex pattern to validate the cell value.

TYPE: str DEFAULT: None

is_description

Flag indicating if the value is a description.

TYPE: bool DEFAULT: False

is_data

Flag indicating if the value is a data type.

TYPE: bool DEFAULT: False

is_url

Flag indicating if the value is a URL.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

The extracted and validated cell value as a string. Returns an empty string if the value is invalid or not provided.

Source code in bam_masterdata/cli/excel_to_entities.py
def extract_value(
    self,
    sheet: "Worksheet",
    row: int,
    column: int,
    validation_pattern: str = None,
    is_description: bool = False,
    is_data: bool = False,
    is_url: bool = False,
) -> str:
    """
    Extracts and validates a value from a specified cell in the Excel sheet.

    Args:
        sheet: The worksheet object.
        row: The row number of the cell (1-based index).
        column: The column number of the cell (1-based index).
        validation_pattern: Optional regex pattern to validate the cell value.
        is_description: Flag indicating if the value is a description.
        is_data: Flag indicating if the value is a data type.
        is_url: Flag indicating if the value is a URL.

    Returns:
        The extracted and validated cell value as a string. Returns an empty string if the value is invalid or not provided.
    """
    value = sheet.cell(row=row, column=column).value

    # No `value` provided
    if not value:
        return ""

    validated = (
        bool(re.match(validation_pattern, str(value)))
        if validation_pattern
        else True
    )
    error_message = f"Invalid value '{value}' at row {row}, column {column} in sheet {sheet.title}"

    if is_description:
        error_message += " Description should follow the schema: English Description + '//' + German Description."
    elif is_data:
        validated = str(value) in [dt.value for dt in DataType]
        error_message += f" The Data Type should be one of the following: {[dt.value for dt in DataType]}"
    elif is_url:
        error_message += " It should be an URL or empty"

    if not validated:
        self.logger.error(
            error_message,
            cell_value=value,
            sheet_title=sheet.title,
            row=row,
            column=column,
        )

    return value or ""

process_entity(sheet, start_index_row, header_terms, expected_terms, entity_type)

Process an entity type block in the Excel sheet and return its attributes as a dictionary.

PARAMETER DESCRIPTION
sheet

The worksheet object.

TYPE: Worksheet

start_index_row

The row where the current entity type begins (1-based index).

TYPE: int

header_terms

List of header terms in the entity block.

TYPE: list[str]

expected_terms

List of expected terms to extract from the entity block.

TYPE: list[str]

entity_type

The type of the entity (e.g., SAMPLE_TYPE, OBJECT_TYPE).

TYPE: str

RETURNS DESCRIPTION
dict[str, Any]

A dictionary containing the attributes of the entity.

Source code in bam_masterdata/cli/excel_to_entities.py
def process_entity(
    self,
    sheet: "Worksheet",
    start_index_row: int,
    header_terms: list[str],
    expected_terms: list[str],
    entity_type: str,
) -> dict[str, Any]:
    """
    Process an entity type block in the Excel sheet and return its attributes as a dictionary.

    Args:
        sheet: The worksheet object.
        start_index_row: The row where the current entity type begins (1-based index).
        header_terms: List of header terms in the entity block.
        expected_terms: List of expected terms to extract from the entity block.
        entity_type: The type of the entity (e.g., SAMPLE_TYPE, OBJECT_TYPE).

    Returns:
        A dictionary containing the attributes of the entity.
    """
    attributes: dict = {}
    cell_value: Any = ""

    for term in expected_terms:
        if term not in header_terms:
            self.logger.error(f"{term} not found in the headers.", term=term)
        else:
            term_index = header_terms.index(term)
            cell = sheet.cell(row=start_index_row + 2, column=term_index + 1)
            cell_value = self.extract_value(
                sheet,
                start_index_row + 2,
                term_index + 1,
                self.VALIDATION_RULES[entity_type][term].get("pattern"),
            )

            # Handle boolean conversion
            if self.VALIDATION_RULES[entity_type][term].get("is_bool"):
                cell_value = self.str_to_bool(
                    value=cell_value,
                    term=term,
                    coordinate=cell.coordinate,
                    sheet_title=sheet.title,
                )

            # Handle data type validation
            elif self.VALIDATION_RULES[entity_type][term].get("is_data"):
                if cell_value not in [dt.value for dt in DataType]:
                    self.logger.error(
                        f"Invalid Data Type: {cell_value} in {cell.coordinate} (Sheet: {sheet.title}). Should be one of the following: {[dt.value for dt in DataType]}",
                        term=term,
                        cell_value=cell_value,
                        cell_coordinate=cell.coordinate,
                        sheet_title=sheet.title,
                    )

            # Handle additional validation for "Generated code prefix"
            elif (
                self.VALIDATION_RULES[entity_type][term].get("extra_validation")
                == "is_reduced_version"
            ):
                if not is_reduced_version(cell_value, attributes.get("code", "")):
                    self.logger.warning(
                        f"Invalid {term} value '{cell_value}' in {cell.coordinate} (Sheet: {sheet.title}). "
                        f"Generated code prefix should be part of the 'Code' {attributes.get('code', '')}.",
                        term=term,
                        cell_value=cell_value,
                        cell_coordinate=cell.coordinate,
                        sheet_title=sheet.title,
                    )

            # Handle validation script (allows empty but must match pattern if provided)
            elif (
                self.VALIDATION_RULES[entity_type][term].get("allow_empty")
                and not cell_value
            ):
                cell_value = None

            # Handle URL template validation (allows empty but must be a valid URL)
            elif (
                self.VALIDATION_RULES[entity_type][term].get("is_url")
                and cell_value
            ):
                url_pattern = self.VALIDATION_RULES[entity_type][term].get(
                    "pattern"
                )
                if not re.match(url_pattern, str(cell_value)):
                    self.logger.error(
                        f"Invalid URL format: {cell_value} in {cell.coordinate} (Sheet: {sheet.title})",
                        cell_value=cell_value,
                        cell_coordinate=cell.coordinate,
                        sheet_title=sheet.title,
                    )

            # Add the extracted value to the attributes dictionary
            attributes[self.VALIDATION_RULES[entity_type][term].get("key")] = (
                cell_value
            )

    if self.row_cell_info:
        attributes["row_location"] = f"A{start_index_row}"
    return attributes

properties_to_dict(sheet, start_index_row, last_non_empty_row)

Extracts properties from an Entity type block in the Excel sheet and returns them as a dictionary.

PARAMETER DESCRIPTION
sheet

The worksheet object.

TYPE: Worksheet

start_index_row

Row where the current entity type begins (1-based index).

TYPE: int

last_non_empty_row

Row where the current entity type finish (1-based index).

TYPE: int

RETURNS DESCRIPTION
dict[str, dict[str, Any]]

A dictionary where each key is a property code and the value is a dictionary

dict[str, dict[str, Any]]

containing the attributes of the property.

Source code in bam_masterdata/cli/excel_to_entities.py
def properties_to_dict(
    self, sheet: "Worksheet", start_index_row: int, last_non_empty_row: int
) -> dict[str, dict[str, Any]]:
    """
    Extracts properties from an Entity type block in the Excel sheet and returns them as a dictionary.

    Args:
        sheet: The worksheet object.
        start_index_row: Row where the current entity type begins (1-based index).
        last_non_empty_row: Row where the current entity type finish (1-based index).

    Returns:
        A dictionary where each key is a property code and the value is a dictionary
        containing the attributes of the property.
    """
    property_dict: dict = {}
    expected_terms = [
        "Code",
        "Description",
        "Mandatory",
        "Show in edit views",
        "Section",
        "Property label",
        "Data type",
        "Vocabulary code",
        "Metadata",
        "Dynamic script",
        # ! these are not used
        # "Unique",
        # "Internal assignment",
    ]

    # Determine the header row index
    header_index = start_index_row + 3
    row_headers = [(cell.value, cell.coordinate) for cell in sheet[header_index]]
    # And store how many properties are for the entity
    n_properties = last_non_empty_row - header_index
    if n_properties < 0:
        self.logger.error(
            f"No properties found for the entity in sheet {sheet.title} starting at row {start_index_row}."
        )
        return property_dict

    # Initialize a dictionary to store extracted columns
    extracted_columns: dict[str, list] = {term: [] for term in expected_terms}
    if self.row_cell_info:
        extracted_columns["row_location"] = []

    # Extract columns for each expected term
    for i, (term, coordinate) in enumerate(row_headers):
        if term not in expected_terms:
            log_func = (
                self.logger.warning
                if term
                in (
                    "Mandatory",
                    "Show in edit views",
                    "Section",
                    "Metadata",
                    "Dynamic script",
                    "Vocabulary code",
                    # ! these are not used
                    # "Unique",
                    # "Internal assignment",
                )
                else self.logger.error
            )
            log_func(f"'{term}' not found in the properties headers.", term=term)
            continue

        # Excel column letter from the coordinate
        term_letter = coordinate[0]

        # Extract values from the column
        for cell_property in sheet[term_letter][header_index:last_non_empty_row]:
            extracted_columns[term].append(
                self.process_term(
                    term, cell_property.value, cell_property.coordinate, sheet.title
                )
            )
            if self.row_cell_info:
                extracted_columns["row_location"].append(cell_property.coordinate)

    # Combine extracted values into a dictionary
    for i in range(n_properties):
        code = extracted_columns.get("Code", [])
        if not code:
            self.logger.error(
                f"'Code' not found in the properties headers for sheet {sheet.title}."
            )
            return property_dict
        code = code[i]
        property_dict[code] = {"permId": code, "code": code}
        for key, pybis_val in {
            "Description": "description",
            "Section": "section",
            "Mandatory": "mandatory",
            "Show in edit views": "show_in_edit_views",
            "Property label": "label",
            "Data type": "dataType",
            "Vocabulary code": "vocabularyCode",
        }.items():
            data_column = extracted_columns.get(key, [])
            if not data_column:
                continue
            property_dict[code][pybis_val] = data_column[i]
        if self.row_cell_info:
            property_dict[code]["row_location"] = (
                extracted_columns.get("row_location")[i],
            )
        # Only add optional fields if they exist in extracted_columns
        optional_fields = [
            "Metadata",
            "Dynamic script",
            "Unique",
            "Internal assignment",
        ]
        for field in optional_fields:
            if (
                field in extracted_columns
            ):  # Check if the field exists in the extracted columns
                if extracted_columns[field][i] == "":
                    extracted_columns[field][i] = None
                property_dict[extracted_columns["Code"][i]][
                    field.lower().replace(" ", "_")
                ] = extracted_columns[field][i]

    return property_dict

terms_to_dict(sheet, start_index_row, last_non_empty_row)

Extracts terms from a Vocabulary block in the Excel sheet and returns them as a dictionary.

PARAMETER DESCRIPTION
sheet

The worksheet object.

TYPE: Worksheet

start_index_row

Row where the current entity type begins (1-based index).

TYPE: int

last_non_empty_row

Row where the current entity type finish (1-based index).

TYPE: int

RETURNS DESCRIPTION
dict[str, dict[str, Any]]

A dictionary where each key is a vocabulary term code and the value is a dictionary

dict[str, dict[str, Any]]

containing the attributes of the vocabulary term.

Source code in bam_masterdata/cli/excel_to_entities.py
def terms_to_dict(
    self, sheet: "Worksheet", start_index_row: int, last_non_empty_row: int
) -> dict[str, dict[str, Any]]:
    """
    Extracts terms from a Vocabulary block in the Excel sheet and returns them as a dictionary.

    Args:
        sheet: The worksheet object.
        start_index_row: Row where the current entity type begins (1-based index).
        last_non_empty_row: Row where the current entity type finish (1-based index).

    Returns:
        A dictionary where each key is a vocabulary term code and the value is a dictionary
        containing the attributes of the vocabulary term.
    """
    terms_dict = {}
    expected_terms = ["Code", "Description", "Url template", "Label", "Official"]

    header_index = start_index_row + 3
    row_headers = [cell.value for cell in sheet[header_index]]

    # Initialize a dictionary to store extracted columns
    extracted_columns: dict[str, list] = {term: [] for term in expected_terms}

    # Helper function to process each term
    def process_term_cell(term, cell_value, coordinate, sheet_title):
        if term == "Official":
            return self.str_to_bool(
                value=cell_value,
                term=term,
                coordinate=coordinate,
                sheet_title=sheet_title,
            )
        return self.get_and_check_property(
            value=cell_value,
            term=term,
            coordinate=coordinate,
            sheet_title=sheet_title,
            is_code=(term == "Code"),
            is_url=(term == "Url template"),
        )

    # Extract columns for each expected term
    for term in expected_terms:
        if term not in row_headers:
            self.logger.warning(
                f"{term} not found in the properties headers.", term=term
            )
            continue

        # Get column index and Excel letter
        term_index = row_headers.index(term) + 1
        term_letter = self.index_to_excel_column(term_index)

        # Extract values from the column
        for cell in sheet[term_letter][header_index:last_non_empty_row]:
            extracted_columns[term].append(
                process_term_cell(term, cell.value, cell.coordinate, sheet.title)
            )

    # Combine extracted values into a dictionary
    for i in range(len(extracted_columns["Code"])):
        terms_dict[extracted_columns["Code"][i]] = {
            "permId": extracted_columns["Code"][i],
            "code": extracted_columns["Code"][i],
        }
        for attr_key in ["Description", "Url template", "Label", "Official"]:
            if extracted_columns.get(attr_key):
                value = extracted_columns[attr_key][i]
                terms_dict[extracted_columns["Code"][i]][attr_key] = value

    return terms_dict

block_to_entity_dict(sheet, start_index_row, last_non_empty_row, complete_dict)

Extracts entity attributes from an Excel sheet block and returns them as a dictionary.

Source code in bam_masterdata/cli/excel_to_entities.py
def block_to_entity_dict(
    self,
    sheet: "Worksheet",
    start_index_row: int,
    last_non_empty_row: int,
    complete_dict: dict[str, Any],
) -> dict[str, Any]:
    """
    Extracts entity attributes from an Excel sheet block and returns them as a dictionary.
    """
    attributes_dict: dict = {}

    # Get the entity type
    entity_type = sheet[f"A{start_index_row}"].value
    if entity_type not in self.VALIDATION_RULES:
        raise ValueError(f"Invalid entity type: {entity_type}")

    # Get the header terms
    header_terms = [cell.value for cell in sheet[start_index_row + 1]]

    # Process entity data using the helper function
    attributes_dict = self.process_entity(
        sheet,
        start_index_row,
        header_terms,
        list(self.VALIDATION_RULES[entity_type].keys()),
        entity_type,
    )

    # Extract additional attributes if necessary
    if entity_type in {
        "SAMPLE_TYPE",
        "OBJECT_TYPE",
        "EXPERIMENT_TYPE",
        "DATASET_TYPE",
    }:
        attributes_dict["properties"] = (
            self.properties_to_dict(sheet, start_index_row, last_non_empty_row)
            or {}
        )

    elif entity_type == "VOCABULARY_TYPE":
        attributes_dict["terms"] = (
            self.terms_to_dict(sheet, start_index_row, last_non_empty_row) or {}
        )

    # Add the entity to the complete dictionary
    complete_dict[attributes_dict["code"]] = attributes_dict

    # Return sorted dictionary
    return dict(sorted(complete_dict.items(), key=lambda item: item[0].count(".")))

excel_to_entities()

Extracts entities from an Excel file and returns them as a dictionary.

RETURNS DESCRIPTION
dict[str, dict[str, Any]]

dict[str, dict[str, Any]]: A dictionary where each key is a normalized sheet name and the value is a dictionary

dict[str, dict[str, Any]]

containing the extracted entities. Returns an empty dictionary if all sheets are empty.

Source code in bam_masterdata/cli/excel_to_entities.py
def excel_to_entities(self) -> dict[str, dict[str, Any]]:
    """
    Extracts entities from an Excel file and returns them as a dictionary.

    Returns:
        dict[str, dict[str, Any]]: A dictionary where each key is a normalized sheet name and the value is a dictionary
        containing the extracted entities. Returns an empty dictionary if all sheets are empty.
    """
    sheets_dict: dict[str, dict[str, Any]] = {}
    sheet_names = self.workbook.sheetnames
    has_content = False  # Track if any sheet has valid content

    for i, sheet_name in enumerate(sheet_names):
        normalized_sheet_name = sheet_name.lower().replace(" ", "_")
        sheet = self.workbook[sheet_name]
        start_row = 1

        # **Check if the sheet is empty**
        if all(
            sheet.cell(row=row, column=col).value in (None, "")
            for row in range(1, sheet.max_row + 1)
            for col in range(1, sheet.max_column + 1)
        ):
            self.logger.info(f"Skipping empty sheet: {sheet_name}")
            continue  # Move to the next sheet

        sheets_dict[normalized_sheet_name] = {}

        consecutive_empty_rows = 0  # Track consecutive empty rows
        while start_row <= sheet.max_row:
            # **Check for two consecutive empty rows**
            is_row_empty = all(
                sheet.cell(row=start_row, column=col).value in (None, "")
                for col in range(1, sheet.max_column + 1)
            )

            if is_row_empty:
                consecutive_empty_rows += 1
                if consecutive_empty_rows >= 2:
                    # **Reached the end of the sheet, move to the next**
                    if i == len(sheet_names) - 1:
                        self.logger.info(
                            f"Last sheet {sheet_name} processed. End of the file reached."
                        )
                    else:
                        self.logger.info(
                            f"End of the current sheet {sheet_name} reached. Switching to next sheet..."
                        )
                    break  # Stop processing this sheet
            else:
                consecutive_empty_rows = 0  # Reset if we find a non-empty row

                # **Process the entity block**
                last_non_empty_row = self.get_last_non_empty_row(sheet, start_row)
                if last_non_empty_row is None:
                    break  # No more valid blocks

                sheets_dict[normalized_sheet_name] = self.block_to_entity_dict(
                    sheet,
                    start_row,
                    last_non_empty_row,
                    sheets_dict[normalized_sheet_name],
                )
                has_content = True  # Found valid content

                # Move to the next entity block
                start_row = last_non_empty_row + 1
                continue  # Continue loop without increasing consecutive_empty_rows

            start_row += 1  # Move to the next row

    # **If no sheets had content, return an empty dictionary**
    if not has_content:
        self.logger.warning(
            "No valid data found in any sheets. Returning empty dictionary."
        )
        return {}

    return sheets_dict

bam_masterdata.cli.entities_to_excel

entities_to_excel(worksheet, module_path, definitions_module)

Export entities to the Excel file. The Python modules are imported using the function import_module, and their contents are inspected (using inspect) to find the classes in the datamodel containing defs and with a model_to_json method defined. Each row is then appended to the worksheet.

PARAMETER DESCRIPTION
worksheet

The worksheet to append the entities.

TYPE: Worksheet

module_path

Path to the Python module file.

TYPE: str

definitions_module

The module containing the definitions of the entities. This is used to match the header definitions of the entities.

TYPE: Any

Source code in bam_masterdata/cli/entities_to_excel.py
def entities_to_excel(
    worksheet: "Worksheet",
    module_path: str,
    definitions_module: Any,
) -> None:
    """
    Export entities to the Excel file. The Python modules are imported using the function `import_module`,
    and their contents are inspected (using `inspect`) to find the classes in the datamodel containing
    `defs` and with a `model_to_json` method defined. Each row is then appended to the `worksheet`.

    Args:
        worksheet (Worksheet): The worksheet to append the entities.
        module_path (str): Path to the Python module file.
        definitions_module (Any): The module containing the definitions of the entities. This is used
            to match the header definitions of the entities.
    """
    def_members = inspect.getmembers(definitions_module, inspect.isclass)
    module = import_module(module_path=module_path)

    # Inspect Python modules and their objects and print them to Excel
    for _, obj in inspect.getmembers(module, inspect.isclass):
        # Ensure the class has the `model_to_json` method
        if not hasattr(obj, "defs") or not callable(getattr(obj, "model_to_json")):
            continue

        obj_instance = obj()

        # Entity title
        obj_definitions = obj_instance.defs
        worksheet.append([obj_definitions.excel_name])

        # Entity header definitions and values
        for def_name, def_cls in def_members:
            if def_name == obj_definitions.name:
                break
        # Appending headers and values in worksheet
        excel_headers = []
        header_values = []
        for field, excel_header in obj_definitions.excel_headers_map.items():
            header_values.append(getattr(obj_definitions, field))
            excel_headers.append(excel_header)
        worksheet.append(excel_headers)
        worksheet.append(header_values)

        # Properties assignment for ObjectType, DatasetType, and CollectionType
        if obj_instance.base_name in ["ObjectType", "DatasetType", "CollectionType"]:
            if not obj_instance.properties:
                continue
            worksheet.append(
                list(obj_instance.properties[0].excel_headers_map.values())
            )
            for prop in obj_instance.properties:
                row = []
                for field in prop.excel_headers_map.keys():
                    if field == "data_type":
                        val = prop.data_type.value
                    else:
                        val = getattr(prop, field)
                    row.append(val)
                worksheet.append(row)
        # Terms assignment for VocabularyType
        elif obj_instance.base_name == "VocabularyType":
            if not obj_instance.terms:
                continue
            worksheet.append(list(obj_instance.terms[0].excel_headers_map.values()))
            for term in obj_instance.terms:
                worksheet.append(
                    getattr(term, f_set) for f_set in term.excel_headers_map.keys()
                )
        worksheet.append([""])  # empty row after entity definitions

bam_masterdata.cli.entities_to_rdf

BAM = Namespace('https://bamresearch.github.io/bam-masterdata/#')

PROV = Namespace('http://www.w3.org/ns/prov#')

rdf_graph_init(g)

Initialize the RDF graph with base namespaces, annotation properties, and internal BAM properties. This function also creates placeholders for PropertyType and other entity types. The graph is to be printed out in RDF/XML format in the entities_to_rdf function.

PARAMETER DESCRIPTION
g

The RDF graph to be initialized.

TYPE: Graph

Source code in bam_masterdata/cli/entities_to_rdf.py
def rdf_graph_init(g: "Graph") -> None:
    """
    Initialize the RDF graph with base namespaces, annotation properties, and internal BAM properties. This
    function also creates placeholders for PropertyType and other entity types. The graph is to be printed out
    in RDF/XML format in the `entities_to_rdf` function.

    Args:
        g (Graph): The RDF graph to be initialized.
    """
    # Adding base namespaces
    g.bind("dc", DC)
    g.bind("owl", OWL)
    g.bind("rdf", RDF)
    g.bind("rdfs", RDFS)
    g.bind("bam", BAM)
    g.bind("prov", PROV)

    # Adding annotation properties from base namespaces
    annotation_props = [
        RDFS.label,
        RDFS.comment,
        DC.identifier,
    ]
    for prop in annotation_props:
        g.add((prop, RDF.type, OWL.AnnotationProperty))

    # Custom annotation properties from openBIS: `dataType`, `propertyLabel
    custom_annotation_props = {
        BAM[
            "dataType"
        ]: """Represents the data type of a property as defined in the openBIS platform.
        This annotation is used to ensure alignment with the native data types in openBIS,
        facilitating seamless integration and data exchange.

        The allowed values for this annotation correspond directly to the openBIS type system,
        including BOOLEAN, CONTROLLEDVOCABULARY, DATE, HYPERLINK, INTEGER, MULTILINE_VARCHAR, OBJECT,
        REAL, TIMESTAMP, VARCHAR, and XML.

        While `bam:dataType` is primarily intended for internal usage with openBIS, mappings to
        standard vocabularies such as `xsd` (e.g., `xsd:boolean`, `xsd:string`) are possible to use and documented to
        enhance external interoperability. The full mapping is:
        - BOOLEAN: xsd:boolean
        - CONTROLLEDVOCABULARY: xsd:string
        - DATE: xsd:date
        - HYPERLINK: xsd:anyURI
        - INTEGER: xsd:integer
        - MULTILINE_VARCHAR: xsd:string
        - OBJECT: bam:ObjectType
        - REAL: xsd:decimal
        - TIMESTAMP: xsd:dateTime
        - VARCHAR: xsd:string
        - XML: xsd:string""",
        BAM[
            "propertyLabel"
        ]: """A UI-specific annotation used in openBIS to provide an alternative label for a property
        displayed in the frontend. Not intended for semantic reasoning or interoperability beyond openBIS.""",
    }
    for custom_prop, custom_prop_def in custom_annotation_props.items():
        g.add((custom_prop, RDF.type, OWL.AnnotationProperty))
        g.add(
            (
                custom_prop,
                RDFS.label,
                Literal(f"bam:{custom_prop.split('/')[-1]}", lang="en"),
            )
        )
        g.add((custom_prop, RDFS.comment, Literal(custom_prop_def, lang="en")))

    # Internal BAM properties
    # ? `section`, `ordinal`, `show_in_edit_views`?
    bam_props_uri = {
        BAM["hasMandatoryProperty"]: [
            (RDF.type, OWL.ObjectProperty),
            # (RDFS.domain, OWL.Class),
            (RDFS.range, BAM.PropertyType),
            (RDFS.label, Literal("hasMandatoryProperty", lang="en")),
            (
                RDFS.comment,
                Literal(
                    "The property must be mandatorily filled when creating the object in openBIS.",
                    lang="en",
                ),
            ),
        ],
        BAM["hasOptionalProperty"]: [
            (RDF.type, OWL.ObjectProperty),
            # (RDFS.domain, OWL.Class),
            (RDFS.range, BAM.PropertyType),
            (RDFS.label, Literal("hasOptionalProperty", lang="en")),
            (
                RDFS.comment,
                Literal(
                    "The property is optionally filled when creating the object in openBIS.",
                    lang="en",
                ),
            ),
        ],
        BAM["referenceTo"]: [
            (RDF.type, OWL.ObjectProperty),
            (RDFS.domain, BAM.PropertyType),  # Restricting domain to PropertyType
            # (RDFS.range, OWL.Class),  # Explicitly setting range to ObjectType
            (RDFS.label, Literal("referenceTo", lang="en")),
            (
                RDFS.comment,
                Literal(
                    "The property is referencing an object existing in openBIS.",
                    lang="en",
                ),
            ),
        ],
    }
    for prop_uri, obj_properties in bam_props_uri.items():
        for prop in obj_properties:  # type: ignore
            g.add((prop_uri, prop[0], prop[1]))  # type: ignore

    # Adding base PropertyType and other objects as placeholders
    # ! add only PropertyType
    prop_type_description = """A conceptual placeholder used to define and organize properties as first-class entities.
        PropertyType is used to place properties and define their metadata, separating properties from the
        entities they describe.

        In integration scenarios:
        - PropertyType can align with `BFO:Quality` for inherent attributes.
        - PropertyType can represent `BFO:Role` if properties serve functional purposes.
        - PropertyType can be treated as a `prov:Entity` when properties participate in provenance relationships."""
    for entity in ["PropertyType", "ObjectType", "CollectionType", "DatasetType"]:
        entity_uri = BAM[entity]
        g.add((entity_uri, RDF.type, OWL.Thing))
        g.add((entity_uri, RDFS.label, Literal(entity, lang="en")))
        if entity == "PropertyType":
            g.add((entity_uri, RDFS.comment, Literal(prop_type_description, lang="en")))

entities_to_rdf(graph, module_path, logger)

Convert the entities defined in the specified module to RDF triples and add them to the graph. The function uses the model_to_rdf method defined in each class to convert the class attributes to RDF triples. The function also adds the PropertyType and other entity types as placeholders in the graph.

PARAMETER DESCRIPTION
graph

The RDF graph to which the entities are added.

TYPE: Graph

module_path

The path to the module containing the entities to be converted.

TYPE: str

logger

The logger to log messages.

TYPE: BoundLoggerLazyProxy

Source code in bam_masterdata/cli/entities_to_rdf.py
def entities_to_rdf(
    graph: "Graph", module_path: str, logger: "BoundLoggerLazyProxy"
) -> None:
    """
    Convert the entities defined in the specified module to RDF triples and add them to the graph. The function
    uses the `model_to_rdf` method defined in each class to convert the class attributes to RDF triples. The
    function also adds the PropertyType and other entity types as placeholders in the graph.

    Args:
        graph (Graph): The RDF graph to which the entities are added.
        module_path (str): The path to the module containing the entities to be converted.
        logger (BoundLoggerLazyProxy): The logger to log messages.
    """
    rdf_graph_init(graph)

    module = import_module(module_path=module_path)

    # All datamodel modules
    # OBJECT/DATASET/COLLECTION TYPES
    # skos:prefLabel used for class names
    # skos:definition used for `description` (en, de)
    # dc:identifier used for `code`  # ! only defined for internal codes with $ symbol
    # parents defined from `code`
    # assigned properties can be Mandatory or Optional, can be PropertyType or ObjectType
    # ? For OBJECT TYPES
    # ? `generated_code_prefix`, `auto_generate_codes`?
    for name, obj in inspect.getmembers(module, inspect.isclass):
        # Ensure the class has the `model_to_rdf` method
        if not hasattr(obj, "defs") or not callable(getattr(obj, "model_to_rdf")):
            continue
        try:
            # Instantiate the class and call the method
            entity = obj()
            entity.model_to_rdf(namespace=BAM, graph=graph, logger=logger)
        except Exception as err:
            click.echo(f"Failed to process class {name} in {module_path}: {err}")

bam_masterdata.excel.excel_to_entities

MasterdataExcelExtractor

Source code in bam_masterdata/excel/excel_to_entities.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
class MasterdataExcelExtractor:
    # TODO move these validation rules to a separate json
    VALIDATION_RULES: dict[str, dict[str, dict[str, Any]]] = {}

    def __init__(self, excel_path: str, **kwargs):
        """Initialize the MasterdataExtractor."""
        self.excel_path = excel_path
        self.row_cell_info = kwargs.get("row_cell_info", False)
        self.workbook = openpyxl.load_workbook(excel_path)
        self.logger = kwargs.get("logger", logger)

        # Load validation rules at initialization
        if not MasterdataExcelExtractor.VALIDATION_RULES:
            self.VALIDATION_RULES = load_validation_rules(
                self.logger,
                os.path.join(VALIDATION_RULES_DIR, "excel_validation_rules.json"),
            )

    def index_to_excel_column(self, index: int) -> str:
        """
        Converts a 1-based index to an Excel column name.

        Args:
            index: The 1-based index to convert.

        Returns:
            The corresponding Excel column name.
        """
        if not index >= 1:
            raise ValueError("Index must be a positive integer starting from 1.")

        column = ""
        while index > 0:
            index, remainder = divmod(index - 1, 26)
            column = chr(65 + remainder) + column
        return column

    def get_last_non_empty_row(
        self, sheet: "Worksheet", start_index: int
    ) -> int | None:
        """
        Finds the last non-empty row before encountering a completely empty row.

        Args:
            sheet: The worksheet object.
            start_index: The row number to start checking from (1-based index).

        Returns:
            The row number of the last non-empty row before an empty row is encountered,
            or None if no non-empty rows are found starting from the given index.
        """
        if start_index < 1 or start_index > sheet.max_row:
            raise ValueError(
                f"Invalid start index: {start_index}. It must be between 1 and {sheet.max_row}."
            )

        last_non_empty_row = None
        for row in range(start_index, sheet.max_row + 1):
            if all(
                sheet.cell(row=row, column=col).value in (None, "")
                for col in range(1, sheet.max_column + 1)
            ):
                return last_non_empty_row  # Return the last non-empty row before the current empty row

            last_non_empty_row = row  # Update the last non-empty row

        return last_non_empty_row  # If no empty row is encountered, return the last non-empty row

    def str_to_bool(
        self,
        value: str | bool | None,
        term: str,
        coordinate: str,
        sheet_title: str,
    ) -> bool:
        """
        Converts a string to a boolean value.

        Args:
            value: The string to convert.

        Returns:
            The boolean value.
        """
        # No `value` provided
        if not value:
            return False

        val = str(value).strip().lower()
        if val not in ["true", "false"]:
            self.logger.error(
                f"Invalid {term.lower()} value found in the {term} column at position {coordinate} in {sheet_title}. Accepted values: TRUE or FALSE.",
                term=term,
                cell_value=val,
                cell_coordinate=coordinate,
                sheet_title=sheet_title,
            )
        return val == "true"

    def get_and_check_property(
        self,
        value: str | bool | None,
        term: str,
        coordinate: str,
        sheet_title: str,
        is_description: bool = False,
        is_code: bool = False,
        is_data: bool = False,
        is_url: bool = False,
    ) -> str:
        """
        Gets a property and checks its format.

        Args:
            value: The string to convert.

        Returns:
            The property.
        """

        # No `value` provided
        if not value:
            return ""

        val = str(value)
        error_message = f"Invalid {term.lower()} value found in the {term} column at position {coordinate} in {sheet_title}."
        if is_description:
            if not re.match(r".*//.*", val):
                self.logger.error(
                    error_message
                    + "Description should follow the schema: English Description + '//' + German Description. ",
                    term=term,
                    cell_value=val,
                    cell_coordinate=coordinate,
                    sheet_title=sheet_title,
                )
        elif is_code:
            if not re.match(r"^\$?[A-Z0-9_.]+$", val):
                self.logger.error(
                    error_message,
                    term=term,
                    cell_value=val,
                    cell_coordinate=coordinate,
                    sheet_title=sheet_title,
                )
        elif is_data:
            if val not in [dt.value for dt in DataType]:
                self.logger.error(
                    error_message
                    + f"The Data Type should be one of the following: {[dt.value for dt in DataType]}",
                    term=term,
                    cell_value=val,
                    cell_coordinate=coordinate,
                    sheet_title=sheet_title,
                )
                val = val.upper()
        elif is_url:
            if not re.match(
                r"https?://(?:www\.)?[a-zA-Z0-9-._~:/?#@!$&'()*+,;=%]+", val
            ):
                self.logger.error(
                    error_message,
                    term=term,
                    cell_value=val,
                    cell_coordinate=coordinate,
                    sheet_title=sheet_title,
                )
        else:
            if not re.match(r".*", val):
                self.logger.error(
                    error_message,
                    term=term,
                    cell_value=val,
                    cell_coordinate=coordinate,
                    sheet_title=sheet_title,
                )
        return val

    # Helper function to process each term
    def process_term(
        self, term: str, cell_value: Any, coordinate: str, sheet_title: str
    ) -> Any:
        """
        Processes a term by converting it to a boolean if necessary or checking its validity.

        Args:
            term: The term being processed.
            cell_value: The value of the cell.
            coordinate: The coordinate of the cell in the sheet.
            sheet_title: The title of the sheet.

        Returns:
            The processed value, either as a boolean or the original value after validation.
        """
        # Check if the term is a boolean type
        if term in ("Mandatory", "Show in edit views"):
            return self.str_to_bool(
                value=cell_value,
                term=term,
                coordinate=coordinate,
                sheet_title=sheet_title,
            )
        # Check and validate the property
        return self.get_and_check_property(
            value=cell_value,
            term=term,
            coordinate=coordinate,
            sheet_title=sheet_title,
            is_code=(term in ["Code", "Vocabulary code"]),
            is_data=(term == "Data type"),
        )

    def extract_value(
        self,
        sheet: "Worksheet",
        row: int,
        column: int,
        validation_pattern: str = None,
        is_description: bool = False,
        is_data: bool = False,
        is_url: bool = False,
    ) -> str:
        """
        Extracts and validates a value from a specified cell in the Excel sheet.

        Args:
            sheet: The worksheet object.
            row: The row number of the cell (1-based index).
            column: The column number of the cell (1-based index).
            validation_pattern: Optional regex pattern to validate the cell value.
            is_description: Flag indicating if the value is a description.
            is_data: Flag indicating if the value is a data type.
            is_url: Flag indicating if the value is a URL.

        Returns:
            The extracted and validated cell value as a string. Returns an empty string if the value is invalid or not provided.
        """
        value = sheet.cell(row=row, column=column).value

        # No `value` provided
        if not value:
            return ""

        validated = (
            bool(re.match(validation_pattern, str(value)))
            if validation_pattern
            else True
        )
        error_message = f"Invalid value '{value}' at row {row}, column {column} in sheet {sheet.title}"

        if is_description:
            error_message += " Description should follow the schema: English Description + '//' + German Description."
        elif is_data:
            validated = str(value) in [dt.value for dt in DataType]
            error_message += f" The Data Type should be one of the following: {[dt.value for dt in DataType]}"
        elif is_url:
            error_message += " It should be an URL or empty"

        if not validated:
            self.logger.error(
                error_message,
                cell_value=value,
                sheet_title=sheet.title,
                row=row,
                column=column,
            )

        return value or ""

    def process_entity(
        self,
        sheet: "Worksheet",
        start_index_row: int,
        header_terms: list[str],
        expected_terms: list[str],
        entity_type: str,
    ) -> dict[str, Any]:
        """
        Process an entity type block in the Excel sheet and return its attributes as a dictionary.

        Args:
            sheet: The worksheet object.
            start_index_row: The row where the current entity type begins (1-based index).
            header_terms: List of header terms in the entity block.
            expected_terms: List of expected terms to extract from the entity block.
            entity_type: The type of the entity (e.g., SAMPLE_TYPE, OBJECT_TYPE).

        Returns:
            A dictionary containing the attributes of the entity.
        """
        attributes: dict = {}
        cell_value: Any = ""

        for term in expected_terms:
            if term not in header_terms:
                self.logger.error(f"{term} not found in the headers.", term=term)
            else:
                term_index = header_terms.index(term)
                cell = sheet.cell(row=start_index_row + 2, column=term_index + 1)
                cell_value = self.extract_value(
                    sheet,
                    start_index_row + 2,
                    term_index + 1,
                    self.VALIDATION_RULES[entity_type][term].get("pattern"),
                )

                # Handle boolean conversion
                if self.VALIDATION_RULES[entity_type][term].get("is_bool"):
                    cell_value = self.str_to_bool(
                        value=cell_value,
                        term=term,
                        coordinate=cell.coordinate,
                        sheet_title=sheet.title,
                    )

                # Handle data type validation
                elif self.VALIDATION_RULES[entity_type][term].get("is_data"):
                    if cell_value not in [dt.value for dt in DataType]:
                        self.logger.error(
                            f"Invalid Data Type: {cell_value} in {cell.coordinate} (Sheet: {sheet.title}). Should be one of the following: {[dt.value for dt in DataType]}",
                            term=term,
                            cell_value=cell_value,
                            cell_coordinate=cell.coordinate,
                            sheet_title=sheet.title,
                        )

                # Handle additional validation for "Generated code prefix"
                elif (
                    self.VALIDATION_RULES[entity_type][term].get("extra_validation")
                    == "is_reduced_version"
                ):
                    if not is_reduced_version(cell_value, attributes.get("code", "")):
                        self.logger.warning(
                            f"Invalid {term} value '{cell_value}' in {cell.coordinate} (Sheet: {sheet.title}). "
                            f"Generated code prefix should be part of the 'Code' {attributes.get('code', '')}.",
                            term=term,
                            cell_value=cell_value,
                            cell_coordinate=cell.coordinate,
                            sheet_title=sheet.title,
                        )

                # Handle validation script (allows empty but must match pattern if provided)
                elif (
                    self.VALIDATION_RULES[entity_type][term].get("allow_empty")
                    and not cell_value
                ):
                    cell_value = None

                # Handle URL template validation (allows empty but must be a valid URL)
                elif (
                    self.VALIDATION_RULES[entity_type][term].get("is_url")
                    and cell_value
                ):
                    url_pattern = self.VALIDATION_RULES[entity_type][term].get(
                        "pattern"
                    )
                    if not re.match(url_pattern, str(cell_value)):
                        self.logger.error(
                            f"Invalid URL format: {cell_value} in {cell.coordinate} (Sheet: {sheet.title})",
                            cell_value=cell_value,
                            cell_coordinate=cell.coordinate,
                            sheet_title=sheet.title,
                        )

                # Add the extracted value to the attributes dictionary
                attributes[self.VALIDATION_RULES[entity_type][term].get("key")] = (
                    cell_value
                )

        if self.row_cell_info:
            attributes["row_location"] = f"A{start_index_row}"
        return attributes

    def properties_to_dict(
        self, sheet: "Worksheet", start_index_row: int, last_non_empty_row: int
    ) -> dict[str, dict[str, Any]]:
        """
        Extracts properties from an Entity type block in the Excel sheet and returns them as a dictionary.

        Args:
            sheet: The worksheet object.
            start_index_row: Row where the current entity type begins (1-based index).
            last_non_empty_row: Row where the current entity type finish (1-based index).

        Returns:
            A dictionary where each key is a property code and the value is a dictionary
            containing the attributes of the property.
        """
        property_dict: dict = {}
        expected_terms = [
            "Code",
            "Description",
            "Mandatory",
            "Show in edit views",
            "Section",
            "Property label",
            "Data type",
            "Vocabulary code",
            "Metadata",
            "Dynamic script",
            # ! these are not used
            # "Unique",
            # "Internal assignment",
        ]

        # Determine the header row index
        header_index = start_index_row + 3
        row_headers = [(cell.value, cell.coordinate) for cell in sheet[header_index]]
        # And store how many properties are for the entity
        n_properties = last_non_empty_row - header_index
        if n_properties < 0:
            self.logger.error(
                f"No properties found for the entity in sheet {sheet.title} starting at row {start_index_row}."
            )
            return property_dict

        # Initialize a dictionary to store extracted columns
        extracted_columns: dict[str, list] = {term: [] for term in expected_terms}
        if self.row_cell_info:
            extracted_columns["row_location"] = []

        # Extract columns for each expected term
        for i, (term, coordinate) in enumerate(row_headers):
            if term not in expected_terms:
                log_func = (
                    self.logger.warning
                    if term
                    in (
                        "Mandatory",
                        "Show in edit views",
                        "Section",
                        "Metadata",
                        "Dynamic script",
                        "Vocabulary code",
                        # ! these are not used
                        # "Unique",
                        # "Internal assignment",
                    )
                    else self.logger.error
                )
                log_func(f"'{term}' not found in the properties headers.", term=term)
                continue

            # Excel column letter from the coordinate
            term_letter = coordinate[0]

            # Extract values from the column
            for cell_property in sheet[term_letter][header_index:last_non_empty_row]:
                extracted_columns[term].append(
                    self.process_term(
                        term, cell_property.value, cell_property.coordinate, sheet.title
                    )
                )
                if self.row_cell_info:
                    extracted_columns["row_location"].append(cell_property.coordinate)

        # Combine extracted values into a dictionary
        for i in range(n_properties):
            code = extracted_columns.get("Code", [])
            if not code:
                self.logger.error(
                    f"'Code' not found in the properties headers for sheet {sheet.title}."
                )
                return property_dict
            code = code[i]
            property_dict[code] = {"permId": code, "code": code}
            for key, pybis_val in {
                "Description": "description",
                "Section": "section",
                "Mandatory": "mandatory",
                "Show in edit views": "show_in_edit_views",
                "Property label": "label",
                "Data type": "dataType",
                "Vocabulary code": "vocabularyCode",
            }.items():
                data_column = extracted_columns.get(key, [])
                if not data_column:
                    continue
                property_dict[code][pybis_val] = data_column[i]
            if self.row_cell_info:
                property_dict[code]["row_location"] = (
                    extracted_columns.get("row_location")[i],
                )
            # Only add optional fields if they exist in extracted_columns
            optional_fields = [
                "Metadata",
                "Dynamic script",
                "Unique",
                "Internal assignment",
            ]
            for field in optional_fields:
                if (
                    field in extracted_columns
                ):  # Check if the field exists in the extracted columns
                    if extracted_columns[field][i] == "":
                        extracted_columns[field][i] = None
                    property_dict[extracted_columns["Code"][i]][
                        field.lower().replace(" ", "_")
                    ] = extracted_columns[field][i]

        return property_dict

    def terms_to_dict(
        self, sheet: "Worksheet", start_index_row: int, last_non_empty_row: int
    ) -> dict[str, dict[str, Any]]:
        """
        Extracts terms from a Vocabulary block in the Excel sheet and returns them as a dictionary.

        Args:
            sheet: The worksheet object.
            start_index_row: Row where the current entity type begins (1-based index).
            last_non_empty_row: Row where the current entity type finish (1-based index).

        Returns:
            A dictionary where each key is a vocabulary term code and the value is a dictionary
            containing the attributes of the vocabulary term.
        """
        terms_dict = {}
        expected_terms = ["Code", "Description", "Url template", "Label", "Official"]

        header_index = start_index_row + 3
        row_headers = [cell.value for cell in sheet[header_index]]

        # Initialize a dictionary to store extracted columns
        extracted_columns: dict[str, list] = {term: [] for term in expected_terms}

        # Helper function to process each term
        def process_term(term, cell_value, coordinate, sheet_title):
            if term == "Official":
                return self.str_to_bool(
                    value=cell_value,
                    term=term,
                    coordinate=coordinate,
                    sheet_title=sheet_title,
                )
            return self.get_and_check_property(
                value=cell_value,
                term=term,
                coordinate=coordinate,
                sheet_title=sheet_title,
                is_code=(term == "Code"),
                is_url=(term == "Url template"),
            )

        # Extract columns for each expected term
        for term in expected_terms:
            if term not in row_headers:
                self.logger.warning(
                    f"{term} not found in the properties headers.", term=term
                )
                continue

            # Get column index and Excel letter
            term_index = row_headers.index(term) + 1
            term_letter = self.index_to_excel_column(term_index)

            # Extract values from the column
            for cell in sheet[term_letter][header_index:last_non_empty_row]:
                extracted_columns[term].append(
                    process_term(term, cell.value, cell.coordinate, sheet.title)
                )

        # Combine extracted values into a dictionary
        for i in range(len(extracted_columns["Code"])):
            terms_dict[extracted_columns["Code"][i]] = {
                "permId": extracted_columns["Code"][i],
                "code": extracted_columns["Code"][i],
                "descriptions": extracted_columns["Description"][i],
                "url_template": extracted_columns["Url template"][i],
                "label": extracted_columns["Label"][i],
                "official": extracted_columns["Official"][i],
            }

        return terms_dict

    def block_to_entity_dict(
        self,
        sheet: "Worksheet",
        start_index_row: int,
        last_non_empty_row: int,
        complete_dict: dict[str, Any],
    ) -> dict[str, Any]:
        """
        Extracts entity attributes from an Excel sheet block and returns them as a dictionary.
        """
        attributes_dict: dict = {}

        # Get the entity type
        entity_type = sheet[f"A{start_index_row}"].value
        if entity_type not in self.VALIDATION_RULES:
            raise ValueError(f"Invalid entity type: {entity_type}")

        # Get the header terms
        header_terms = [cell.value for cell in sheet[start_index_row + 1]]

        # Process entity data using the helper function
        attributes_dict = self.process_entity(
            sheet,
            start_index_row,
            header_terms,
            list(self.VALIDATION_RULES[entity_type].keys()),
            entity_type,
        )

        # Extract additional attributes if necessary
        if entity_type in {
            "SAMPLE_TYPE",
            "OBJECT_TYPE",
            "EXPERIMENT_TYPE",
            "DATASET_TYPE",
        }:
            attributes_dict["properties"] = (
                self.properties_to_dict(sheet, start_index_row, last_non_empty_row)
                or {}
            )

        elif entity_type == "VOCABULARY_TYPE":
            attributes_dict["terms"] = (
                self.terms_to_dict(sheet, start_index_row, last_non_empty_row) or {}
            )

        # Add the entity to the complete dictionary
        complete_dict[attributes_dict["code"]] = attributes_dict

        # Return sorted dictionary
        return dict(sorted(complete_dict.items(), key=lambda item: item[0].count(".")))

    def excel_to_entities(self) -> dict[str, dict[str, Any]]:
        """
        Extracts entities from an Excel file and returns them as a dictionary.

        Returns:
            dict[str, dict[str, Any]]: A dictionary where each key is a normalized sheet name and the value is a dictionary
            containing the extracted entities. Returns an empty dictionary if all sheets are empty.
        """
        sheets_dict: dict[str, dict[str, Any]] = {}
        sheet_names = self.workbook.sheetnames
        has_content = False  # Track if any sheet has valid content

        for i, sheet_name in enumerate(sheet_names):
            normalized_sheet_name = sheet_name.lower().replace(" ", "_")
            sheet = self.workbook[sheet_name]
            start_row = 1

            # **Check if the sheet is empty**
            if all(
                sheet.cell(row=row, column=col).value in (None, "")
                for row in range(1, sheet.max_row + 1)
                for col in range(1, sheet.max_column + 1)
            ):
                self.logger.info(f"Skipping empty sheet: {sheet_name}")
                continue  # Move to the next sheet

            sheets_dict[normalized_sheet_name] = {}

            consecutive_empty_rows = 0  # Track consecutive empty rows
            while start_row <= sheet.max_row:
                # **Check for two consecutive empty rows**
                is_row_empty = all(
                    sheet.cell(row=start_row, column=col).value in (None, "")
                    for col in range(1, sheet.max_column + 1)
                )

                if is_row_empty:
                    consecutive_empty_rows += 1
                    if consecutive_empty_rows >= 2:
                        # **Reached the end of the sheet, move to the next**
                        if i == len(sheet_names) - 1:
                            self.logger.info(
                                f"Last sheet {sheet_name} processed. End of the file reached."
                            )
                        else:
                            self.logger.info(
                                f"End of the current sheet {sheet_name} reached. Switching to next sheet..."
                            )
                        break  # Stop processing this sheet
                else:
                    consecutive_empty_rows = 0  # Reset if we find a non-empty row

                    # **Process the entity block**
                    last_non_empty_row = self.get_last_non_empty_row(sheet, start_row)
                    if last_non_empty_row is None:
                        break  # No more valid blocks

                    sheets_dict[normalized_sheet_name] = self.block_to_entity_dict(
                        sheet,
                        start_row,
                        last_non_empty_row,
                        sheets_dict[normalized_sheet_name],
                    )
                    has_content = True  # Found valid content

                    # Move to the next entity block
                    start_row = last_non_empty_row + 1
                    continue  # Continue loop without increasing consecutive_empty_rows

                start_row += 1  # Move to the next row

        # **If no sheets had content, return an empty dictionary**
        if not has_content:
            self.logger.warning(
                "No valid data found in any sheets. Returning empty dictionary."
            )
            return {}

        return sheets_dict

VALIDATION_RULES = {}

excel_path = excel_path

row_cell_info = kwargs.get('row_cell_info', False)

workbook = openpyxl.load_workbook(excel_path)

logger = kwargs.get('logger', logger)

__init__(excel_path, **kwargs)

Initialize the MasterdataExtractor.

Source code in bam_masterdata/excel/excel_to_entities.py
def __init__(self, excel_path: str, **kwargs):
    """Initialize the MasterdataExtractor."""
    self.excel_path = excel_path
    self.row_cell_info = kwargs.get("row_cell_info", False)
    self.workbook = openpyxl.load_workbook(excel_path)
    self.logger = kwargs.get("logger", logger)

    # Load validation rules at initialization
    if not MasterdataExcelExtractor.VALIDATION_RULES:
        self.VALIDATION_RULES = load_validation_rules(
            self.logger,
            os.path.join(VALIDATION_RULES_DIR, "excel_validation_rules.json"),
        )

index_to_excel_column(index)

Converts a 1-based index to an Excel column name.

PARAMETER DESCRIPTION
index

The 1-based index to convert.

TYPE: int

RETURNS DESCRIPTION
str

The corresponding Excel column name.

Source code in bam_masterdata/excel/excel_to_entities.py
def index_to_excel_column(self, index: int) -> str:
    """
    Converts a 1-based index to an Excel column name.

    Args:
        index: The 1-based index to convert.

    Returns:
        The corresponding Excel column name.
    """
    if not index >= 1:
        raise ValueError("Index must be a positive integer starting from 1.")

    column = ""
    while index > 0:
        index, remainder = divmod(index - 1, 26)
        column = chr(65 + remainder) + column
    return column

get_last_non_empty_row(sheet, start_index)

Finds the last non-empty row before encountering a completely empty row.

PARAMETER DESCRIPTION
sheet

The worksheet object.

TYPE: Worksheet

start_index

The row number to start checking from (1-based index).

TYPE: int

RETURNS DESCRIPTION
int | None

The row number of the last non-empty row before an empty row is encountered,

int | None

or None if no non-empty rows are found starting from the given index.

Source code in bam_masterdata/excel/excel_to_entities.py
def get_last_non_empty_row(
    self, sheet: "Worksheet", start_index: int
) -> int | None:
    """
    Finds the last non-empty row before encountering a completely empty row.

    Args:
        sheet: The worksheet object.
        start_index: The row number to start checking from (1-based index).

    Returns:
        The row number of the last non-empty row before an empty row is encountered,
        or None if no non-empty rows are found starting from the given index.
    """
    if start_index < 1 or start_index > sheet.max_row:
        raise ValueError(
            f"Invalid start index: {start_index}. It must be between 1 and {sheet.max_row}."
        )

    last_non_empty_row = None
    for row in range(start_index, sheet.max_row + 1):
        if all(
            sheet.cell(row=row, column=col).value in (None, "")
            for col in range(1, sheet.max_column + 1)
        ):
            return last_non_empty_row  # Return the last non-empty row before the current empty row

        last_non_empty_row = row  # Update the last non-empty row

    return last_non_empty_row  # If no empty row is encountered, return the last non-empty row

str_to_bool(value, term, coordinate, sheet_title)

Converts a string to a boolean value.

PARAMETER DESCRIPTION
value

The string to convert.

TYPE: str | bool | None

RETURNS DESCRIPTION
bool

The boolean value.

Source code in bam_masterdata/excel/excel_to_entities.py
def str_to_bool(
    self,
    value: str | bool | None,
    term: str,
    coordinate: str,
    sheet_title: str,
) -> bool:
    """
    Converts a string to a boolean value.

    Args:
        value: The string to convert.

    Returns:
        The boolean value.
    """
    # No `value` provided
    if not value:
        return False

    val = str(value).strip().lower()
    if val not in ["true", "false"]:
        self.logger.error(
            f"Invalid {term.lower()} value found in the {term} column at position {coordinate} in {sheet_title}. Accepted values: TRUE or FALSE.",
            term=term,
            cell_value=val,
            cell_coordinate=coordinate,
            sheet_title=sheet_title,
        )
    return val == "true"

get_and_check_property(value, term, coordinate, sheet_title, is_description=False, is_code=False, is_data=False, is_url=False)

Gets a property and checks its format.

PARAMETER DESCRIPTION
value

The string to convert.

TYPE: str | bool | None

RETURNS DESCRIPTION
str

The property.

Source code in bam_masterdata/excel/excel_to_entities.py
def get_and_check_property(
    self,
    value: str | bool | None,
    term: str,
    coordinate: str,
    sheet_title: str,
    is_description: bool = False,
    is_code: bool = False,
    is_data: bool = False,
    is_url: bool = False,
) -> str:
    """
    Gets a property and checks its format.

    Args:
        value: The string to convert.

    Returns:
        The property.
    """

    # No `value` provided
    if not value:
        return ""

    val = str(value)
    error_message = f"Invalid {term.lower()} value found in the {term} column at position {coordinate} in {sheet_title}."
    if is_description:
        if not re.match(r".*//.*", val):
            self.logger.error(
                error_message
                + "Description should follow the schema: English Description + '//' + German Description. ",
                term=term,
                cell_value=val,
                cell_coordinate=coordinate,
                sheet_title=sheet_title,
            )
    elif is_code:
        if not re.match(r"^\$?[A-Z0-9_.]+$", val):
            self.logger.error(
                error_message,
                term=term,
                cell_value=val,
                cell_coordinate=coordinate,
                sheet_title=sheet_title,
            )
    elif is_data:
        if val not in [dt.value for dt in DataType]:
            self.logger.error(
                error_message
                + f"The Data Type should be one of the following: {[dt.value for dt in DataType]}",
                term=term,
                cell_value=val,
                cell_coordinate=coordinate,
                sheet_title=sheet_title,
            )
            val = val.upper()
    elif is_url:
        if not re.match(
            r"https?://(?:www\.)?[a-zA-Z0-9-._~:/?#@!$&'()*+,;=%]+", val
        ):
            self.logger.error(
                error_message,
                term=term,
                cell_value=val,
                cell_coordinate=coordinate,
                sheet_title=sheet_title,
            )
    else:
        if not re.match(r".*", val):
            self.logger.error(
                error_message,
                term=term,
                cell_value=val,
                cell_coordinate=coordinate,
                sheet_title=sheet_title,
            )
    return val

process_term(term, cell_value, coordinate, sheet_title)

Processes a term by converting it to a boolean if necessary or checking its validity.

PARAMETER DESCRIPTION
term

The term being processed.

TYPE: str

cell_value

The value of the cell.

TYPE: Any

coordinate

The coordinate of the cell in the sheet.

TYPE: str

sheet_title

The title of the sheet.

TYPE: str

RETURNS DESCRIPTION
Any

The processed value, either as a boolean or the original value after validation.

Source code in bam_masterdata/excel/excel_to_entities.py
def process_term(
    self, term: str, cell_value: Any, coordinate: str, sheet_title: str
) -> Any:
    """
    Processes a term by converting it to a boolean if necessary or checking its validity.

    Args:
        term: The term being processed.
        cell_value: The value of the cell.
        coordinate: The coordinate of the cell in the sheet.
        sheet_title: The title of the sheet.

    Returns:
        The processed value, either as a boolean or the original value after validation.
    """
    # Check if the term is a boolean type
    if term in ("Mandatory", "Show in edit views"):
        return self.str_to_bool(
            value=cell_value,
            term=term,
            coordinate=coordinate,
            sheet_title=sheet_title,
        )
    # Check and validate the property
    return self.get_and_check_property(
        value=cell_value,
        term=term,
        coordinate=coordinate,
        sheet_title=sheet_title,
        is_code=(term in ["Code", "Vocabulary code"]),
        is_data=(term == "Data type"),
    )

extract_value(sheet, row, column, validation_pattern=None, is_description=False, is_data=False, is_url=False)

Extracts and validates a value from a specified cell in the Excel sheet.

PARAMETER DESCRIPTION
sheet

The worksheet object.

TYPE: Worksheet

row

The row number of the cell (1-based index).

TYPE: int

column

The column number of the cell (1-based index).

TYPE: int

validation_pattern

Optional regex pattern to validate the cell value.

TYPE: str DEFAULT: None

is_description

Flag indicating if the value is a description.

TYPE: bool DEFAULT: False

is_data

Flag indicating if the value is a data type.

TYPE: bool DEFAULT: False

is_url

Flag indicating if the value is a URL.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

The extracted and validated cell value as a string. Returns an empty string if the value is invalid or not provided.

Source code in bam_masterdata/excel/excel_to_entities.py
def extract_value(
    self,
    sheet: "Worksheet",
    row: int,
    column: int,
    validation_pattern: str = None,
    is_description: bool = False,
    is_data: bool = False,
    is_url: bool = False,
) -> str:
    """
    Extracts and validates a value from a specified cell in the Excel sheet.

    Args:
        sheet: The worksheet object.
        row: The row number of the cell (1-based index).
        column: The column number of the cell (1-based index).
        validation_pattern: Optional regex pattern to validate the cell value.
        is_description: Flag indicating if the value is a description.
        is_data: Flag indicating if the value is a data type.
        is_url: Flag indicating if the value is a URL.

    Returns:
        The extracted and validated cell value as a string. Returns an empty string if the value is invalid or not provided.
    """
    value = sheet.cell(row=row, column=column).value

    # No `value` provided
    if not value:
        return ""

    validated = (
        bool(re.match(validation_pattern, str(value)))
        if validation_pattern
        else True
    )
    error_message = f"Invalid value '{value}' at row {row}, column {column} in sheet {sheet.title}"

    if is_description:
        error_message += " Description should follow the schema: English Description + '//' + German Description."
    elif is_data:
        validated = str(value) in [dt.value for dt in DataType]
        error_message += f" The Data Type should be one of the following: {[dt.value for dt in DataType]}"
    elif is_url:
        error_message += " It should be an URL or empty"

    if not validated:
        self.logger.error(
            error_message,
            cell_value=value,
            sheet_title=sheet.title,
            row=row,
            column=column,
        )

    return value or ""

process_entity(sheet, start_index_row, header_terms, expected_terms, entity_type)

Process an entity type block in the Excel sheet and return its attributes as a dictionary.

PARAMETER DESCRIPTION
sheet

The worksheet object.

TYPE: Worksheet

start_index_row

The row where the current entity type begins (1-based index).

TYPE: int

header_terms

List of header terms in the entity block.

TYPE: list[str]

expected_terms

List of expected terms to extract from the entity block.

TYPE: list[str]

entity_type

The type of the entity (e.g., SAMPLE_TYPE, OBJECT_TYPE).

TYPE: str

RETURNS DESCRIPTION
dict[str, Any]

A dictionary containing the attributes of the entity.

Source code in bam_masterdata/excel/excel_to_entities.py
def process_entity(
    self,
    sheet: "Worksheet",
    start_index_row: int,
    header_terms: list[str],
    expected_terms: list[str],
    entity_type: str,
) -> dict[str, Any]:
    """
    Process an entity type block in the Excel sheet and return its attributes as a dictionary.

    Args:
        sheet: The worksheet object.
        start_index_row: The row where the current entity type begins (1-based index).
        header_terms: List of header terms in the entity block.
        expected_terms: List of expected terms to extract from the entity block.
        entity_type: The type of the entity (e.g., SAMPLE_TYPE, OBJECT_TYPE).

    Returns:
        A dictionary containing the attributes of the entity.
    """
    attributes: dict = {}
    cell_value: Any = ""

    for term in expected_terms:
        if term not in header_terms:
            self.logger.error(f"{term} not found in the headers.", term=term)
        else:
            term_index = header_terms.index(term)
            cell = sheet.cell(row=start_index_row + 2, column=term_index + 1)
            cell_value = self.extract_value(
                sheet,
                start_index_row + 2,
                term_index + 1,
                self.VALIDATION_RULES[entity_type][term].get("pattern"),
            )

            # Handle boolean conversion
            if self.VALIDATION_RULES[entity_type][term].get("is_bool"):
                cell_value = self.str_to_bool(
                    value=cell_value,
                    term=term,
                    coordinate=cell.coordinate,
                    sheet_title=sheet.title,
                )

            # Handle data type validation
            elif self.VALIDATION_RULES[entity_type][term].get("is_data"):
                if cell_value not in [dt.value for dt in DataType]:
                    self.logger.error(
                        f"Invalid Data Type: {cell_value} in {cell.coordinate} (Sheet: {sheet.title}). Should be one of the following: {[dt.value for dt in DataType]}",
                        term=term,
                        cell_value=cell_value,
                        cell_coordinate=cell.coordinate,
                        sheet_title=sheet.title,
                    )

            # Handle additional validation for "Generated code prefix"
            elif (
                self.VALIDATION_RULES[entity_type][term].get("extra_validation")
                == "is_reduced_version"
            ):
                if not is_reduced_version(cell_value, attributes.get("code", "")):
                    self.logger.warning(
                        f"Invalid {term} value '{cell_value}' in {cell.coordinate} (Sheet: {sheet.title}). "
                        f"Generated code prefix should be part of the 'Code' {attributes.get('code', '')}.",
                        term=term,
                        cell_value=cell_value,
                        cell_coordinate=cell.coordinate,
                        sheet_title=sheet.title,
                    )

            # Handle validation script (allows empty but must match pattern if provided)
            elif (
                self.VALIDATION_RULES[entity_type][term].get("allow_empty")
                and not cell_value
            ):
                cell_value = None

            # Handle URL template validation (allows empty but must be a valid URL)
            elif (
                self.VALIDATION_RULES[entity_type][term].get("is_url")
                and cell_value
            ):
                url_pattern = self.VALIDATION_RULES[entity_type][term].get(
                    "pattern"
                )
                if not re.match(url_pattern, str(cell_value)):
                    self.logger.error(
                        f"Invalid URL format: {cell_value} in {cell.coordinate} (Sheet: {sheet.title})",
                        cell_value=cell_value,
                        cell_coordinate=cell.coordinate,
                        sheet_title=sheet.title,
                    )

            # Add the extracted value to the attributes dictionary
            attributes[self.VALIDATION_RULES[entity_type][term].get("key")] = (
                cell_value
            )

    if self.row_cell_info:
        attributes["row_location"] = f"A{start_index_row}"
    return attributes

properties_to_dict(sheet, start_index_row, last_non_empty_row)

Extracts properties from an Entity type block in the Excel sheet and returns them as a dictionary.

PARAMETER DESCRIPTION
sheet

The worksheet object.

TYPE: Worksheet

start_index_row

Row where the current entity type begins (1-based index).

TYPE: int

last_non_empty_row

Row where the current entity type finish (1-based index).

TYPE: int

RETURNS DESCRIPTION
dict[str, dict[str, Any]]

A dictionary where each key is a property code and the value is a dictionary

dict[str, dict[str, Any]]

containing the attributes of the property.

Source code in bam_masterdata/excel/excel_to_entities.py
def properties_to_dict(
    self, sheet: "Worksheet", start_index_row: int, last_non_empty_row: int
) -> dict[str, dict[str, Any]]:
    """
    Extracts properties from an Entity type block in the Excel sheet and returns them as a dictionary.

    Args:
        sheet: The worksheet object.
        start_index_row: Row where the current entity type begins (1-based index).
        last_non_empty_row: Row where the current entity type finish (1-based index).

    Returns:
        A dictionary where each key is a property code and the value is a dictionary
        containing the attributes of the property.
    """
    property_dict: dict = {}
    expected_terms = [
        "Code",
        "Description",
        "Mandatory",
        "Show in edit views",
        "Section",
        "Property label",
        "Data type",
        "Vocabulary code",
        "Metadata",
        "Dynamic script",
        # ! these are not used
        # "Unique",
        # "Internal assignment",
    ]

    # Determine the header row index
    header_index = start_index_row + 3
    row_headers = [(cell.value, cell.coordinate) for cell in sheet[header_index]]
    # And store how many properties are for the entity
    n_properties = last_non_empty_row - header_index
    if n_properties < 0:
        self.logger.error(
            f"No properties found for the entity in sheet {sheet.title} starting at row {start_index_row}."
        )
        return property_dict

    # Initialize a dictionary to store extracted columns
    extracted_columns: dict[str, list] = {term: [] for term in expected_terms}
    if self.row_cell_info:
        extracted_columns["row_location"] = []

    # Extract columns for each expected term
    for i, (term, coordinate) in enumerate(row_headers):
        if term not in expected_terms:
            log_func = (
                self.logger.warning
                if term
                in (
                    "Mandatory",
                    "Show in edit views",
                    "Section",
                    "Metadata",
                    "Dynamic script",
                    "Vocabulary code",
                    # ! these are not used
                    # "Unique",
                    # "Internal assignment",
                )
                else self.logger.error
            )
            log_func(f"'{term}' not found in the properties headers.", term=term)
            continue

        # Excel column letter from the coordinate
        term_letter = coordinate[0]

        # Extract values from the column
        for cell_property in sheet[term_letter][header_index:last_non_empty_row]:
            extracted_columns[term].append(
                self.process_term(
                    term, cell_property.value, cell_property.coordinate, sheet.title
                )
            )
            if self.row_cell_info:
                extracted_columns["row_location"].append(cell_property.coordinate)

    # Combine extracted values into a dictionary
    for i in range(n_properties):
        code = extracted_columns.get("Code", [])
        if not code:
            self.logger.error(
                f"'Code' not found in the properties headers for sheet {sheet.title}."
            )
            return property_dict
        code = code[i]
        property_dict[code] = {"permId": code, "code": code}
        for key, pybis_val in {
            "Description": "description",
            "Section": "section",
            "Mandatory": "mandatory",
            "Show in edit views": "show_in_edit_views",
            "Property label": "label",
            "Data type": "dataType",
            "Vocabulary code": "vocabularyCode",
        }.items():
            data_column = extracted_columns.get(key, [])
            if not data_column:
                continue
            property_dict[code][pybis_val] = data_column[i]
        if self.row_cell_info:
            property_dict[code]["row_location"] = (
                extracted_columns.get("row_location")[i],
            )
        # Only add optional fields if they exist in extracted_columns
        optional_fields = [
            "Metadata",
            "Dynamic script",
            "Unique",
            "Internal assignment",
        ]
        for field in optional_fields:
            if (
                field in extracted_columns
            ):  # Check if the field exists in the extracted columns
                if extracted_columns[field][i] == "":
                    extracted_columns[field][i] = None
                property_dict[extracted_columns["Code"][i]][
                    field.lower().replace(" ", "_")
                ] = extracted_columns[field][i]

    return property_dict

terms_to_dict(sheet, start_index_row, last_non_empty_row)

Extracts terms from a Vocabulary block in the Excel sheet and returns them as a dictionary.

PARAMETER DESCRIPTION
sheet

The worksheet object.

TYPE: Worksheet

start_index_row

Row where the current entity type begins (1-based index).

TYPE: int

last_non_empty_row

Row where the current entity type finish (1-based index).

TYPE: int

RETURNS DESCRIPTION
dict[str, dict[str, Any]]

A dictionary where each key is a vocabulary term code and the value is a dictionary

dict[str, dict[str, Any]]

containing the attributes of the vocabulary term.

Source code in bam_masterdata/excel/excel_to_entities.py
def terms_to_dict(
    self, sheet: "Worksheet", start_index_row: int, last_non_empty_row: int
) -> dict[str, dict[str, Any]]:
    """
    Extracts terms from a Vocabulary block in the Excel sheet and returns them as a dictionary.

    Args:
        sheet: The worksheet object.
        start_index_row: Row where the current entity type begins (1-based index).
        last_non_empty_row: Row where the current entity type finish (1-based index).

    Returns:
        A dictionary where each key is a vocabulary term code and the value is a dictionary
        containing the attributes of the vocabulary term.
    """
    terms_dict = {}
    expected_terms = ["Code", "Description", "Url template", "Label", "Official"]

    header_index = start_index_row + 3
    row_headers = [cell.value for cell in sheet[header_index]]

    # Initialize a dictionary to store extracted columns
    extracted_columns: dict[str, list] = {term: [] for term in expected_terms}

    # Helper function to process each term
    def process_term(term, cell_value, coordinate, sheet_title):
        if term == "Official":
            return self.str_to_bool(
                value=cell_value,
                term=term,
                coordinate=coordinate,
                sheet_title=sheet_title,
            )
        return self.get_and_check_property(
            value=cell_value,
            term=term,
            coordinate=coordinate,
            sheet_title=sheet_title,
            is_code=(term == "Code"),
            is_url=(term == "Url template"),
        )

    # Extract columns for each expected term
    for term in expected_terms:
        if term not in row_headers:
            self.logger.warning(
                f"{term} not found in the properties headers.", term=term
            )
            continue

        # Get column index and Excel letter
        term_index = row_headers.index(term) + 1
        term_letter = self.index_to_excel_column(term_index)

        # Extract values from the column
        for cell in sheet[term_letter][header_index:last_non_empty_row]:
            extracted_columns[term].append(
                process_term(term, cell.value, cell.coordinate, sheet.title)
            )

    # Combine extracted values into a dictionary
    for i in range(len(extracted_columns["Code"])):
        terms_dict[extracted_columns["Code"][i]] = {
            "permId": extracted_columns["Code"][i],
            "code": extracted_columns["Code"][i],
            "descriptions": extracted_columns["Description"][i],
            "url_template": extracted_columns["Url template"][i],
            "label": extracted_columns["Label"][i],
            "official": extracted_columns["Official"][i],
        }

    return terms_dict

block_to_entity_dict(sheet, start_index_row, last_non_empty_row, complete_dict)

Extracts entity attributes from an Excel sheet block and returns them as a dictionary.

Source code in bam_masterdata/excel/excel_to_entities.py
def block_to_entity_dict(
    self,
    sheet: "Worksheet",
    start_index_row: int,
    last_non_empty_row: int,
    complete_dict: dict[str, Any],
) -> dict[str, Any]:
    """
    Extracts entity attributes from an Excel sheet block and returns them as a dictionary.
    """
    attributes_dict: dict = {}

    # Get the entity type
    entity_type = sheet[f"A{start_index_row}"].value
    if entity_type not in self.VALIDATION_RULES:
        raise ValueError(f"Invalid entity type: {entity_type}")

    # Get the header terms
    header_terms = [cell.value for cell in sheet[start_index_row + 1]]

    # Process entity data using the helper function
    attributes_dict = self.process_entity(
        sheet,
        start_index_row,
        header_terms,
        list(self.VALIDATION_RULES[entity_type].keys()),
        entity_type,
    )

    # Extract additional attributes if necessary
    if entity_type in {
        "SAMPLE_TYPE",
        "OBJECT_TYPE",
        "EXPERIMENT_TYPE",
        "DATASET_TYPE",
    }:
        attributes_dict["properties"] = (
            self.properties_to_dict(sheet, start_index_row, last_non_empty_row)
            or {}
        )

    elif entity_type == "VOCABULARY_TYPE":
        attributes_dict["terms"] = (
            self.terms_to_dict(sheet, start_index_row, last_non_empty_row) or {}
        )

    # Add the entity to the complete dictionary
    complete_dict[attributes_dict["code"]] = attributes_dict

    # Return sorted dictionary
    return dict(sorted(complete_dict.items(), key=lambda item: item[0].count(".")))

excel_to_entities()

Extracts entities from an Excel file and returns them as a dictionary.

RETURNS DESCRIPTION
dict[str, dict[str, Any]]

dict[str, dict[str, Any]]: A dictionary where each key is a normalized sheet name and the value is a dictionary

dict[str, dict[str, Any]]

containing the extracted entities. Returns an empty dictionary if all sheets are empty.

Source code in bam_masterdata/excel/excel_to_entities.py
def excel_to_entities(self) -> dict[str, dict[str, Any]]:
    """
    Extracts entities from an Excel file and returns them as a dictionary.

    Returns:
        dict[str, dict[str, Any]]: A dictionary where each key is a normalized sheet name and the value is a dictionary
        containing the extracted entities. Returns an empty dictionary if all sheets are empty.
    """
    sheets_dict: dict[str, dict[str, Any]] = {}
    sheet_names = self.workbook.sheetnames
    has_content = False  # Track if any sheet has valid content

    for i, sheet_name in enumerate(sheet_names):
        normalized_sheet_name = sheet_name.lower().replace(" ", "_")
        sheet = self.workbook[sheet_name]
        start_row = 1

        # **Check if the sheet is empty**
        if all(
            sheet.cell(row=row, column=col).value in (None, "")
            for row in range(1, sheet.max_row + 1)
            for col in range(1, sheet.max_column + 1)
        ):
            self.logger.info(f"Skipping empty sheet: {sheet_name}")
            continue  # Move to the next sheet

        sheets_dict[normalized_sheet_name] = {}

        consecutive_empty_rows = 0  # Track consecutive empty rows
        while start_row <= sheet.max_row:
            # **Check for two consecutive empty rows**
            is_row_empty = all(
                sheet.cell(row=start_row, column=col).value in (None, "")
                for col in range(1, sheet.max_column + 1)
            )

            if is_row_empty:
                consecutive_empty_rows += 1
                if consecutive_empty_rows >= 2:
                    # **Reached the end of the sheet, move to the next**
                    if i == len(sheet_names) - 1:
                        self.logger.info(
                            f"Last sheet {sheet_name} processed. End of the file reached."
                        )
                    else:
                        self.logger.info(
                            f"End of the current sheet {sheet_name} reached. Switching to next sheet..."
                        )
                    break  # Stop processing this sheet
            else:
                consecutive_empty_rows = 0  # Reset if we find a non-empty row

                # **Process the entity block**
                last_non_empty_row = self.get_last_non_empty_row(sheet, start_row)
                if last_non_empty_row is None:
                    break  # No more valid blocks

                sheets_dict[normalized_sheet_name] = self.block_to_entity_dict(
                    sheet,
                    start_row,
                    last_non_empty_row,
                    sheets_dict[normalized_sheet_name],
                )
                has_content = True  # Found valid content

                # Move to the next entity block
                start_row = last_non_empty_row + 1
                continue  # Continue loop without increasing consecutive_empty_rows

            start_row += 1  # Move to the next row

    # **If no sheets had content, return an empty dictionary**
    if not has_content:
        self.logger.warning(
            "No valid data found in any sheets. Returning empty dictionary."
        )
        return {}

    return sheets_dict

bam_masterdata.openbis.login

ologin(url='')

Connect to openBIS using the credentials stored in the environment variables.

PARAMETER DESCRIPTION
url

The URL of the openBIS instance. Defaults to the value of the OPENBIS_URL environment variable.

TYPE: str DEFAULT: ''

RETURNS DESCRIPTION
Openbis

Openbis object for the specific openBIS instance defined in URL.

TYPE: Openbis

Source code in bam_masterdata/openbis/login.py
def ologin(url: str = "") -> Openbis:
    """
    Connect to openBIS using the credentials stored in the environment variables.

    Args:
        url (str): The URL of the openBIS instance. Defaults to the value of the `OPENBIS_URL` environment variable.

    Returns:
        Openbis: Openbis object for the specific openBIS instance defined in `URL`.
    """
    o = Openbis(url)
    o.login(environ("OPENBIS_USERNAME"), environ("OPENBIS_PASSWORD"), save_token=True)
    return o

bam_masterdata.openbis.get_entities

OpenbisEntities

Class to get openBIS entities and their attributes as dictionaries to be printed in the Python modules of bam_masterdata/datamodel/.

Source code in bam_masterdata/openbis/get_entities.py
class OpenbisEntities:
    """
    Class to get openBIS entities and their attributes as dictionaries to be printed in the
    Python modules of `bam_masterdata/datamodel/`.
    """

    def __init__(self, url: str = ""):
        self.openbis = ologin(url=url)

    def _get_formatted_dict(self, entity_name: str):
        # entity_name is property_types, collection_types, dataset_types, object_types, or vocabularies
        entity_types = getattr(self.openbis, f"get_{entity_name}")().df.to_dict(
            orient="records"
        )
        return {entry["code"]: entry for entry in entity_types}

    def _assign_properties(self, entity_name: str, formatted_dict: dict) -> None:
        for entity in getattr(self.openbis, f"get_{entity_name}")():
            perm_id = entity.permId  # Unique identifier for the entity
            assignments = entity.get_property_assignments()

            if assignments:
                # Convert property assignments to list of dictionaries
                assignments_dict = assignments.df.to_dict(orient="records")

                # Create a dictionary of properties using the correct permId
                properties = {}
                for entry in assignments_dict:
                    property_perm_id = self.openbis.get_property_type(
                        entry.get("code", {})
                    ).permId
                    if property_perm_id:
                        # Include the desired property fields
                        properties[property_perm_id] = {
                            "@type": entry.get(
                                "@type", "as.dto.property.PropertyAssignment"
                            ),
                            "@id": entry.get("@id", None),
                            "fetchOptions": entry.get("fetchOptions", None),
                            "permId": property_perm_id,
                            "section": entry.get("section", ""),
                            "ordinal": entry.get("ordinal", None),
                            "mandatory": entry.get("mandatory", False),
                            "showInEditView": entry.get("showInEditView", False),
                            "showRawValueInForms": entry.get(
                                "showRawValueInForms", False
                            ),
                            "semanticAnnotations": entry.get(
                                "semanticAnnotations", None
                            ),
                            "semanticAnnotationsInherited": entry.get(
                                "semanticAnnotationsInherited", False
                            ),
                            "registrator": entry.get("registrator", None),
                            "registrationDate": entry.get("registrationDate", None),
                            "plugin": entry.get("plugin", ""),
                        }
                for prop in assignments:
                    prop = prop.get_property_type()
                    properties[prop.permId].update(
                        {
                            "label": prop.label,
                            "description": prop.description,
                            "dataType": prop.dataType,
                        }
                    )

                # Add properties to the object type in formatted_dict
                formatted_dict[perm_id]["properties"] = properties
            else:
                # If no properties, add an empty dictionary
                formatted_dict[perm_id]["properties"] = {}

    def get_property_dict(self) -> dict:
        """
        Get the property types from openBIS and return them as a dictionary where the keys
        are the property type `code` and the value is a dictionary of attributes assigned to that
        property type.

        Returns:
            dict: Dictionary of property types with their attributes.
        """
        formatted_dict = self._get_formatted_dict("property_types")

        # We return the sorted dictionary in order to have a consistent order for inheritance
        return dict(sorted(formatted_dict.items(), key=lambda item: item[0].count(".")))

    def get_collection_dict(self) -> dict:
        """
        Get the collection types from openBIS and return them as a dictionary where the keys
        are the collection type `code` and the value is a dictionary of attributes assigned to that
        collection type.

        Returns:
            dict: Dictionary of collection types with their attributes.
        """
        formatted_dict = self._get_formatted_dict("collection_types")
        self._assign_properties(
            entity_name="collection_types", formatted_dict=formatted_dict
        )

        # We return the sorted dictionary in order to have a consistent order for inheritance
        return dict(sorted(formatted_dict.items(), key=lambda item: item[0].count(".")))

    def get_dataset_dict(self) -> dict:
        """
        Get the dataset types from openBIS and return them as a dictionary where the keys
        are the dataset type `code` and the value is a dictionary of attributes assigned to that
        dataset type.

        Returns:
            dict: Dictionary of dataset types with their attributes.
        """
        formatted_dict = self._get_formatted_dict("dataset_types")
        self._assign_properties(
            entity_name="dataset_types", formatted_dict=formatted_dict
        )

        # We return the sorted dictionary in order to have a consistent order for inheritance
        return dict(sorted(formatted_dict.items(), key=lambda item: item[0].count(".")))

    def get_object_dict(self) -> dict:
        """
        Get the object types from openBIS and return them as a dictionary where the keys
        are the object type `code` and the value is a dictionary of attributes assigned to that
        object type.

        Returns:
            dict: Dictionary of object types with their attributes.
        """
        formatted_dict = self._get_formatted_dict("object_types")
        self._assign_properties(
            entity_name="object_types", formatted_dict=formatted_dict
        )

        # We return the sorted dictionary in order to have a consistent order for inheritance
        return dict(sorted(formatted_dict.items(), key=lambda item: item[0].count(".")))

    def get_vocabulary_dict(self) -> dict:
        """
        Get the vocabulary types from openBIS and return them as a dictionary where the keys
        are the vocabulary type `code` and the value is a dictionary of attributes assigned to that
        vocabulary type.

        Returns:
            dict: Dictionary of vocabulary types with their attributes.
        """
        formatted_dict = self._get_formatted_dict("vocabularies")

        # Add properties to each object type
        for voc in self.openbis.get_vocabularies():
            code = voc.code  # Unique identifier for the object type

            # ! we need this for parsing!!
            # # BAM_FLOOR, BAM_HOUSE, BAM_LOCATION, BAM_LOCATION_COMPLETE, BAM_OE, BAM_ROOM, PERSON_STATUS
            # # are not exported due to containing sensitive information
            # if code in [
            #     "BAM_FLOOR",
            #     "BAM_HOUSE",
            #     "BAM_LOCATION",
            #     "BAM_LOCATION_COMPLETE",
            #     "BAM_OE",
            #     "BAM_ROOM",
            #     "PERSON_STATUS",
            # ]:
            #     continue
            terms = voc.get_terms()

            if terms:
                # Convert property assignments to list of dictionaries
                terms_dict = terms.df.to_dict(orient="records")

                # Create a dictionary of properties using the correct permId
                voc_terms = {}
                for entry in terms_dict:
                    term_code = entry.get("code", {})
                    if term_code:
                        # Include the desired property fields
                        voc_terms[term_code] = {
                            "code": term_code,
                            "description": entry.get("description", ""),
                            "label": entry.get("label", ""),
                        }

                # Add properties to the object type in formatted_dict
                formatted_dict[code]["terms"] = voc_terms
            else:
                # If no properties, add an empty dictionary
                formatted_dict[code]["terms"] = {}

        # We return the sorted dictionary in order to have a consistent order for inheritance
        return dict(sorted(formatted_dict.items(), key=lambda item: item[0].count(".")))

openbis = ologin(url=url)

__init__(url='')

Source code in bam_masterdata/openbis/get_entities.py
def __init__(self, url: str = ""):
    self.openbis = ologin(url=url)

get_property_dict()

Get the property types from openBIS and return them as a dictionary where the keys are the property type code and the value is a dictionary of attributes assigned to that property type.

RETURNS DESCRIPTION
dict

Dictionary of property types with their attributes.

TYPE: dict

Source code in bam_masterdata/openbis/get_entities.py
def get_property_dict(self) -> dict:
    """
    Get the property types from openBIS and return them as a dictionary where the keys
    are the property type `code` and the value is a dictionary of attributes assigned to that
    property type.

    Returns:
        dict: Dictionary of property types with their attributes.
    """
    formatted_dict = self._get_formatted_dict("property_types")

    # We return the sorted dictionary in order to have a consistent order for inheritance
    return dict(sorted(formatted_dict.items(), key=lambda item: item[0].count(".")))

get_collection_dict()

Get the collection types from openBIS and return them as a dictionary where the keys are the collection type code and the value is a dictionary of attributes assigned to that collection type.

RETURNS DESCRIPTION
dict

Dictionary of collection types with their attributes.

TYPE: dict

Source code in bam_masterdata/openbis/get_entities.py
def get_collection_dict(self) -> dict:
    """
    Get the collection types from openBIS and return them as a dictionary where the keys
    are the collection type `code` and the value is a dictionary of attributes assigned to that
    collection type.

    Returns:
        dict: Dictionary of collection types with their attributes.
    """
    formatted_dict = self._get_formatted_dict("collection_types")
    self._assign_properties(
        entity_name="collection_types", formatted_dict=formatted_dict
    )

    # We return the sorted dictionary in order to have a consistent order for inheritance
    return dict(sorted(formatted_dict.items(), key=lambda item: item[0].count(".")))

get_dataset_dict()

Get the dataset types from openBIS and return them as a dictionary where the keys are the dataset type code and the value is a dictionary of attributes assigned to that dataset type.

RETURNS DESCRIPTION
dict

Dictionary of dataset types with their attributes.

TYPE: dict

Source code in bam_masterdata/openbis/get_entities.py
def get_dataset_dict(self) -> dict:
    """
    Get the dataset types from openBIS and return them as a dictionary where the keys
    are the dataset type `code` and the value is a dictionary of attributes assigned to that
    dataset type.

    Returns:
        dict: Dictionary of dataset types with their attributes.
    """
    formatted_dict = self._get_formatted_dict("dataset_types")
    self._assign_properties(
        entity_name="dataset_types", formatted_dict=formatted_dict
    )

    # We return the sorted dictionary in order to have a consistent order for inheritance
    return dict(sorted(formatted_dict.items(), key=lambda item: item[0].count(".")))

get_object_dict()

Get the object types from openBIS and return them as a dictionary where the keys are the object type code and the value is a dictionary of attributes assigned to that object type.

RETURNS DESCRIPTION
dict

Dictionary of object types with their attributes.

TYPE: dict

Source code in bam_masterdata/openbis/get_entities.py
def get_object_dict(self) -> dict:
    """
    Get the object types from openBIS and return them as a dictionary where the keys
    are the object type `code` and the value is a dictionary of attributes assigned to that
    object type.

    Returns:
        dict: Dictionary of object types with their attributes.
    """
    formatted_dict = self._get_formatted_dict("object_types")
    self._assign_properties(
        entity_name="object_types", formatted_dict=formatted_dict
    )

    # We return the sorted dictionary in order to have a consistent order for inheritance
    return dict(sorted(formatted_dict.items(), key=lambda item: item[0].count(".")))

get_vocabulary_dict()

Get the vocabulary types from openBIS and return them as a dictionary where the keys are the vocabulary type code and the value is a dictionary of attributes assigned to that vocabulary type.

RETURNS DESCRIPTION
dict

Dictionary of vocabulary types with their attributes.

TYPE: dict

Source code in bam_masterdata/openbis/get_entities.py
def get_vocabulary_dict(self) -> dict:
    """
    Get the vocabulary types from openBIS and return them as a dictionary where the keys
    are the vocabulary type `code` and the value is a dictionary of attributes assigned to that
    vocabulary type.

    Returns:
        dict: Dictionary of vocabulary types with their attributes.
    """
    formatted_dict = self._get_formatted_dict("vocabularies")

    # Add properties to each object type
    for voc in self.openbis.get_vocabularies():
        code = voc.code  # Unique identifier for the object type

        # ! we need this for parsing!!
        # # BAM_FLOOR, BAM_HOUSE, BAM_LOCATION, BAM_LOCATION_COMPLETE, BAM_OE, BAM_ROOM, PERSON_STATUS
        # # are not exported due to containing sensitive information
        # if code in [
        #     "BAM_FLOOR",
        #     "BAM_HOUSE",
        #     "BAM_LOCATION",
        #     "BAM_LOCATION_COMPLETE",
        #     "BAM_OE",
        #     "BAM_ROOM",
        #     "PERSON_STATUS",
        # ]:
        #     continue
        terms = voc.get_terms()

        if terms:
            # Convert property assignments to list of dictionaries
            terms_dict = terms.df.to_dict(orient="records")

            # Create a dictionary of properties using the correct permId
            voc_terms = {}
            for entry in terms_dict:
                term_code = entry.get("code", {})
                if term_code:
                    # Include the desired property fields
                    voc_terms[term_code] = {
                        "code": term_code,
                        "description": entry.get("description", ""),
                        "label": entry.get("label", ""),
                    }

            # Add properties to the object type in formatted_dict
            formatted_dict[code]["terms"] = voc_terms
        else:
            # If no properties, add an empty dictionary
            formatted_dict[code]["terms"] = {}

    # We return the sorted dictionary in order to have a consistent order for inheritance
    return dict(sorted(formatted_dict.items(), key=lambda item: item[0].count(".")))

bam_masterdata.checker.checker

MasterdataChecker

Source code in bam_masterdata/checker/checker.py
class MasterdataChecker:
    VALID_MODES = {"self", "incoming", "validate", "compare", "all", "individual"}

    def __init__(self):
        """
        Initialize the comparator with validation rules and set the datamodel directory.
        """
        self.current_model: dict = None
        self.new_entities: dict = None
        self.logger = logger
        self.validation_rules: dict = {}

    def load_current_model(self, datamodel_dir: str = "./bam_masterdata/datamodel/"):
        """
        Load and transform the current data model (Pydantic classes) into JSON.

        Uses the default datamodel directory unless overridden.
        """
        self.logger.info(f"Loading current data model from: {datamodel_dir}")
        entities_dict = EntitiesDict(python_path=datamodel_dir, logger=self.logger)
        self.current_model = entities_dict.single_json()

    def load_new_entities(self, source: str):
        """
        Load new entities from various sources (Python classes, Excel, etc.).
        """
        self.logger.info(f"Loading new entities from: {source}")
        loader = SourceLoader(source)
        self.new_entities = loader.load()

    def check(self, mode: str = "all") -> dict:
        """
        Run validations.

        Modes:
        - "self" -> Validate only the current data model.
        - "incoming" -> Validate only the new entity structure.
        - "validate" -> Validate both the current model and new entities.
        - "compare" -> Compare new entities against the current model.
        - "all" -> Run both validation types.
        - "individual" -> Run individual repositories validations.

        Before running, ensure that required models are loaded based on the mode.

        Returns:
            dict: Validation results.
        """
        # Validate mode selection
        if mode not in self.VALID_MODES:
            raise ValueError(f"Invalid mode: {mode}. Choose from {self.VALID_MODES}.")

        # Load required models based on the selected mode
        if (
            mode in ["self", "validate", "compare", "all", "individual"]
            and self.current_model is None
        ):
            self.logger.info("Current model is missing. Loading it from local files.")
            self.load_current_model()

        if (
            mode in ["incoming", "validate", "compare", "all", "individual"]
            and self.new_entities is None
        ):
            raise ValueError(
                "New entities must be loaded before validation in 'incoming', 'validate', 'individual', 'compare', or 'all' modes."
            )

        # Load the validation rules
        if (
            mode in ["self", "incoming", "validate", "all", "individual"]
            and self.validation_rules == {}
        ):
            self.validation_rules = load_validation_rules(self.logger)

        validator = MasterdataValidator(
            self.new_entities, self.current_model, self.validation_rules
        )
        return validator.validate(mode)

VALID_MODES = {'self', 'incoming', 'validate', 'compare', 'all', 'individual'}

current_model = None

new_entities = None

logger = logger

validation_rules = {}

__init__()

Initialize the comparator with validation rules and set the datamodel directory.

Source code in bam_masterdata/checker/checker.py
def __init__(self):
    """
    Initialize the comparator with validation rules and set the datamodel directory.
    """
    self.current_model: dict = None
    self.new_entities: dict = None
    self.logger = logger
    self.validation_rules: dict = {}

load_current_model(datamodel_dir='./bam_masterdata/datamodel/')

Load and transform the current data model (Pydantic classes) into JSON.

Uses the default datamodel directory unless overridden.

Source code in bam_masterdata/checker/checker.py
def load_current_model(self, datamodel_dir: str = "./bam_masterdata/datamodel/"):
    """
    Load and transform the current data model (Pydantic classes) into JSON.

    Uses the default datamodel directory unless overridden.
    """
    self.logger.info(f"Loading current data model from: {datamodel_dir}")
    entities_dict = EntitiesDict(python_path=datamodel_dir, logger=self.logger)
    self.current_model = entities_dict.single_json()

load_new_entities(source)

Load new entities from various sources (Python classes, Excel, etc.).

Source code in bam_masterdata/checker/checker.py
def load_new_entities(self, source: str):
    """
    Load new entities from various sources (Python classes, Excel, etc.).
    """
    self.logger.info(f"Loading new entities from: {source}")
    loader = SourceLoader(source)
    self.new_entities = loader.load()

check(mode='all')

Run validations.

Modes: - "self" -> Validate only the current data model. - "incoming" -> Validate only the new entity structure. - "validate" -> Validate both the current model and new entities. - "compare" -> Compare new entities against the current model. - "all" -> Run both validation types. - "individual" -> Run individual repositories validations.

Before running, ensure that required models are loaded based on the mode.

RETURNS DESCRIPTION
dict

Validation results.

TYPE: dict

Source code in bam_masterdata/checker/checker.py
def check(self, mode: str = "all") -> dict:
    """
    Run validations.

    Modes:
    - "self" -> Validate only the current data model.
    - "incoming" -> Validate only the new entity structure.
    - "validate" -> Validate both the current model and new entities.
    - "compare" -> Compare new entities against the current model.
    - "all" -> Run both validation types.
    - "individual" -> Run individual repositories validations.

    Before running, ensure that required models are loaded based on the mode.

    Returns:
        dict: Validation results.
    """
    # Validate mode selection
    if mode not in self.VALID_MODES:
        raise ValueError(f"Invalid mode: {mode}. Choose from {self.VALID_MODES}.")

    # Load required models based on the selected mode
    if (
        mode in ["self", "validate", "compare", "all", "individual"]
        and self.current_model is None
    ):
        self.logger.info("Current model is missing. Loading it from local files.")
        self.load_current_model()

    if (
        mode in ["incoming", "validate", "compare", "all", "individual"]
        and self.new_entities is None
    ):
        raise ValueError(
            "New entities must be loaded before validation in 'incoming', 'validate', 'individual', 'compare', or 'all' modes."
        )

    # Load the validation rules
    if (
        mode in ["self", "incoming", "validate", "all", "individual"]
        and self.validation_rules == {}
    ):
        self.validation_rules = load_validation_rules(self.logger)

    validator = MasterdataValidator(
        self.new_entities, self.current_model, self.validation_rules
    )
    return validator.validate(mode)

bam_masterdata.checker.masterdata_validator

MasterdataValidator

Source code in bam_masterdata/checker/masterdata_validator.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
class MasterdataValidator:
    def __init__(self, new_entities: dict, current_model: dict, validation_rules: dict):
        """
        Initialize the validator with new and current entity data.

        Args:
            new_entities (dict): The incoming datamodel.
            current_model (dict): The existing datamodel.
            validation_rules (dict): The validation rules to apply.
        """
        self.new_entities = new_entities
        self.current_model = current_model
        self.validation_rules = validation_rules
        self.logger = logger
        self.log_msgs: list = []
        self.validation_results: dict = {}

    def validate(self, mode: str = "all") -> dict:
        """
        Run validations based on mode:
        - "self": Validate current model structure and format.
        - "incoming": Validate new entities structure and format.
        - "validate": Validate both current and incoming models but do not compare.
        - "compare": Validate new entities against the current model.
        - "all": Run both.
        - "individual": Validate new entities and compare them with the current model.

         Returns:
            dict: Validation results.
        """
        self.logger.debug("Starting validation process...", mode=mode)

        # Reset validation results before running checks
        self.validation_results = {
            "current_model": {},
            "incoming_model": {},
            "comparisons": {},
        }

        if mode in ["self", "all", "validate"]:
            self.logger.debug("Validating current model...")
            self._validate_model(self.current_model)
            self._extract_log_messages(
                self.current_model, self.validation_results["current_model"]
            )

        if mode in ["incoming", "all", "validate"]:
            self.logger.debug("Validating new entities...")
            self._validate_model(self.new_entities)
            self._extract_log_messages(
                self.new_entities, self.validation_results["incoming_model"]
            )

        if mode in ["compare", "all"]:
            self.logger.debug("Comparing new entities with current model...")
            self._compare_with_current_model(mode=mode)
            self._extract_log_messages(
                self.new_entities, self.validation_results["comparisons"]
            )

        if mode == "individual":
            self.logger.debug(
                "Validating new entities and comparing them with current model..."
            )
            self.validation_results = {
                "incoming_model": {},
                "comparisons": {},
            }
            self._validate_model(self.new_entities)
            self._extract_log_messages(
                self.new_entities, self.validation_results["incoming_model"]
            )
            self._compare_with_current_model(mode="individual")
            self._extract_log_messages(
                self.new_entities, self.validation_results["comparisons"]
            )

        return self.validation_results

    def _validate_model(self, model: dict) -> dict:
        """
        Validate the given datamodel against the validation rules.

        Args:
            model (dict): The datamodel to validate.

        Returns:
            dict: A dictionary containing ...
        """
        for entity_type, entities in model.items():
            for entity_name, entity_data in entities.items():
                entity_id = entity_data.get("defs", {}).get("code", entity_name)

                # Ensure _log_msgs exists
                if "_log_msgs" not in entity_data:
                    entity_data["_log_msgs"] = []

                if model == self.new_entities:
                    self.logger.info(f"Validating {entity_type} -> {entity_id}")

                # Validate 'defs'
                if "defs" in entity_data:
                    row_location = entity_data["defs"].get("row_location", "Unknown")
                    self._validate_fields(
                        entity_data["defs"],
                        "defs_validation",
                        entity_type,
                        entity_id,
                        row_location,
                        entity_data,
                    )

                # Collect ordered sections for each entity
                entity_sections = []
                # Validate 'properties' (except for vocabulary_types, which uses 'terms')
                if entity_type != "vocabulary_types" and "properties" in entity_data:
                    for prop in entity_data["properties"]:
                        row_location = prop.get("row_location", "Unknown")

                        # Collect section names in order
                        section = prop.get("section", "").strip()
                        if section:  # Avoid empty sections
                            entity_sections.append(
                                {
                                    "code": prop["code"],
                                    "section": section,
                                    "row_location": row_location,
                                }
                            )

                        # Check for deprecated `$ANNOTATIONS_STATE`
                        if (
                            prop["code"] == "$ANNOTATIONS_STATE"
                            and model == self.new_entities
                        ):
                            log_message = (
                                f"Property $ANNOTATIONS_STATE is deprecated from openBIS 20.10.7.3. "
                                f"Assigned to entity '{entity_id}' at row {row_location}."
                            )
                            store_log_message(
                                logger, entity_data, log_message, level="warning"
                            )

                        self._validate_fields(
                            prop,
                            "properties_validation",
                            entity_type,
                            entity_id,
                            row_location,
                            entity_data,
                        )

                # TODO: revise if these checks about ordering of sections are truly necessary
                # Check if "Additional Information" is followed only by "Additional Information" or "Comments"
                for i in range(len(entity_sections) - 1):
                    current_section = entity_sections[i]["section"]
                    next_section = entity_sections[i + 1]["section"]
                    row_location = entity_sections[i + 1]["row_location"]

                    if (
                        current_section == "Additional Information"
                        and next_section not in {"Additional Information", "Comments"}
                    ):
                        log_message = (
                            f"Invalid section order: 'Additional Information' at row {entity_sections[i]['row_location']} "
                            f"must be followed by 'Comments', but found '{next_section}' at row {row_location}."
                        )
                        store_log_message(
                            logger, entity_data, log_message, level="error"
                        )

                # Check if required properties exist in specific sections
                required_properties = {
                    "Additional Information": "NOTES",
                    "Comments": "$XMLCOMMENTS",
                }

                # Track found properties
                found_properties = {section: False for section in required_properties}

                for entry in entity_sections:
                    section = entry["section"]
                    property_code = entry["code"]
                    row_location = entry["row_location"]

                    if (
                        section in required_properties
                        and property_code == required_properties[section]
                    ):
                        found_properties[section] = True

                # Log errors for missing required properties
                for section, prop in required_properties.items():
                    if (
                        any(entry["section"] == section for entry in entity_sections)
                        and not found_properties[section]
                    ):
                        log_message = f"Missing required property '{prop}' in section '{section}'."
                        store_log_message(
                            logger, entity_data, log_message, level="error"
                        )

                # Validate 'terms' (only for vocabulary_types)
                if entity_type == "vocabulary_types" and "terms" in entity_data:
                    for term in entity_data["terms"]:
                        row_location = term.get("row_location", "Unknown")
                        self._validate_fields(
                            term,
                            "terms_validation",
                            entity_type,
                            entity_id,
                            row_location,
                            entity_data,
                        )

        return entity_data

    def _validate_fields(
        self,
        data: dict,
        rule_type: str,
        entity_type: str,
        entity_name: str,
        row_location: str,
        parent_entity: dict,
    ):
        """
        Validate a dictionary of fields against the corresponding validation rules.

        Args:
            data (dict): The fields to validate.
            rule_type (str): The rule section to use ("defs_validation", "properties_validation", or "terms_validation").
            entity_type (str): The entity type being validated.
            entity_name (str): The specific entity name (ID if available).
            row_location (str): The row where the entity is located in the source file.
            parent_entity (dict): The entity dictionary where _log_msgs should be stored.
        """

        # Determine where the issue is occurring (in properties, terms, or main entity fields)
        extra_location = {
            "properties_validation": "in 'properties'.",
            "terms_validation": "in 'terms'.",
        }.get(rule_type, ".")

        for field, value in data.items():
            rule = self.validation_rules.get(rule_type, {}).get(field)

            extra_location_str = f" {extra_location} " if extra_location else " "

            log_message = (
                f"Invalid '{value}' value found in the '{field}' field at line {row_location} "
                f"in entity '{entity_name}' of '{entity_type}'{extra_location_str}"
            )

            if not rule:
                continue  # Skip fields with no validation rules

            # Handle empty fields
            if "allow_empty" in rule and (value is None or value == "" or not value):
                continue  # Skip check if empty fields are allowed

            # Validate pattern (regex)
            if "pattern" in rule and value is not None:
                if not re.match(rule["pattern"], str(value)):
                    log_message = f"{log_message}Invalid format."
                    level = "error"
                    if "is_description" in rule:
                        log_message = f"{log_message} Description should follow the schema: English Description + '//' + German Description. "
                        level = "warning"
                    if "is_section" in rule:
                        log_message = f"{log_message} First letter of every word starts with capitalized lettter."
                        level = "warning"
                    store_log_message(logger, parent_entity, log_message, level=level)

            # Validate boolean fields
            if "is_bool" in rule and str(value).strip().lower() not in [
                "true",
                "false",
            ]:
                store_log_message(
                    logger,
                    parent_entity,
                    f"{log_message}Expected a boolean.",
                    level="error",
                )

            # Validate data types
            if "is_data" in rule and str(value) not in [dt.value for dt in DataType]:
                store_log_message(
                    logger,
                    parent_entity,
                    f"{log_message}The Data Type should be one of the following: {[dt.value for dt in DataType]}",
                    level="error",
                )

            # Validate special cases (e.g., extra validation functions)
            if "extra_validation" in rule:
                validation_func = getattr(self, rule["extra_validation"], None)
                if validation_func == "is_reduced_version" and not is_reduced_version(
                    value, entity_name
                ):
                    store_log_message(
                        logger,
                        parent_entity,
                        f"{log_message}The generated code should be a part of the code.",
                        level="warning",
                    )

    def _compare_with_current_model(self, mode) -> dict:
        """
        Compare new entities against the current model using validation rules.
        """
        self.logger.debug("Starting comparison with the current model...")

        new_entity = False

        all_props = self.extract_property_codes(self.current_model)

        for entity_type, incoming_entities in self.new_entities.items():
            if entity_type not in self.current_model:
                continue  # Skip if entity type does not exist in the current model

            current_entities = self.current_model[entity_type]

            for entity_code, incoming_entity in incoming_entities.items():
                incoming_row_location = "Unknown"
                current_entity = current_entities.get(entity_code)

                # Ensure _log_msgs exists
                if "_log_msgs" not in incoming_entity:
                    incoming_entity["_log_msgs"] = []

                if current_entity:
                    if mode == "individual":
                        log_message = f"The entity {entity_code} already exists in `bam-masterdata`. Please, check your classes. "
                        store_log_message(
                            logger, incoming_entity, log_message, level="critical"
                        )
                    # Compare general attributes for all entities
                    for key, new_value in incoming_entity.get("defs", {}).items():
                        incoming_row_location = incoming_entity.get("defs", {}).get(
                            "row_location", "Unknown"
                        )
                        old_value = current_entity.get("defs", {}).get(key)
                        if (
                            (key != "code" and key != "row_location")
                            and old_value is not None
                            and new_value != old_value
                        ):
                            log_message = (
                                f"Entity type {entity_code} has changed its attribute {key} "
                                f"from '{old_value}' to '{new_value}' at row {incoming_row_location}."
                            )
                            store_log_message(
                                logger, incoming_entity, log_message, level="warning"
                            )

                    # Special case for `property_types`
                    if entity_type == "property_types":
                        incoming_row_location = incoming_entity.get(
                            "row_location", "Unknown"
                        )
                        new_data_type = incoming_entity.get("data_type")
                        old_data_type = current_entity.get("data_type")

                        if (
                            new_data_type
                            and old_data_type
                            and new_data_type != old_data_type
                        ):
                            log_message = (
                                f"Property type {entity_code} has changed its `data_type` value from {old_data_type} to {new_data_type} at row {incoming_row_location}. "
                                "This will cause that data using the Property with inconsistent versions of data type will probably break openBIS. "
                                "You need to define a new property with the new data type or revise your data model."
                            )
                            store_log_message(
                                logger, incoming_entity, log_message, level="critical"
                            )

                        if (
                            new_data_type == "CONTROLLEDVOCABULARY"
                            and incoming_entity.get("vocabulary_code")
                            != current_entity.get("vocabulary_code")
                        ):
                            old_vocabulary = current_entity.get("vocabulary_code")
                            new_vocabulary = incoming_entity.get("vocabulary_code")
                            log_message = (
                                f"Property type {entity_code} using controlled vocabulary has changed its `vocabulary_code` value from {old_vocabulary} to {new_vocabulary}, "
                                f"at row {incoming_row_location} which means that data using a type that is not compatible with the new type will probably break openBIS. "
                                "You need to define a new property with the new data type or revise your data model."
                            )
                            store_log_message(
                                logger, incoming_entity, log_message, level="critical"
                            )

                else:
                    new_entity = True

                # Compare assigned properties or terms
                if "properties" in incoming_entity:
                    self._compare_assigned_properties(
                        entity_code,
                        incoming_entity,
                        current_entity,
                        entity_type,
                        new_entity,
                        incoming_row_location,
                        all_props,
                    )
                elif "terms" in incoming_entity:
                    self._compare_assigned_properties(
                        entity_code,
                        incoming_entity,
                        current_entity,
                        entity_type,
                        new_entity,
                        incoming_row_location,
                        all_props,
                        is_terms=True,
                    )

        if not self.validation_results.get("comparisons"):
            logger.info(
                "No critical conflicts found between new entities compared to the current model."
            )

        return self.validation_results

    def _compare_assigned_properties(
        self,
        entity_code,
        incoming_entity,
        current_entity,
        entity_type,
        new_entity,
        incoming_row_location,
        all_props,
        is_terms=False,
    ):
        """
        Compares assigned properties (for ObjectType, CollectionType, etc.) or terms (for VocabularyType).
        """
        incoming_props = {
            prop["code"]: prop
            for prop in incoming_entity.get(
                "properties" if not is_terms else "terms", []
            )
        }

        incoming_prop_codes = set(incoming_props.keys())

        if not new_entity:
            current_props = {
                prop["code"]: prop
                for prop in current_entity.get(
                    "properties" if not is_terms else "terms", []
                )
            }

            # Check for non-existing assigned properties
            current_prop_codes = set(current_props.keys())

            for prop_code in incoming_prop_codes:
                if prop_code not in all_props and is_terms is False:
                    log_message = (
                        f"The assigned property {prop_code} to the entity {entity_code} at row {incoming_props[prop_code].get('row_location')} does not exist in openBIS. "
                        "Please, define it in your PropertyType section."
                    )
                    store_log_message(
                        logger, incoming_entity, log_message, level="error"
                    )

            # Check for existing changes in assigned properties
            missing_properties = incoming_prop_codes - current_prop_codes
            deleted_properties = current_prop_codes - incoming_prop_codes

            if missing_properties or deleted_properties:
                log_message = f"The assigned properties to {entity_code} at row {incoming_row_location} have changed:"
                store_log_message(logger, incoming_entity, log_message, level="warning")

            # Check for missing properties
            for missing in missing_properties:
                log_message = f"{missing} has been added as a new property at row {incoming_props[missing].get('row_location')}."
                store_log_message(logger, incoming_entity, log_message, level="info")

            # Check for deleted properties
            for deleted in deleted_properties:
                log_message = f"{deleted} has been deleted."
                store_log_message(logger, incoming_entity, log_message, level="warning")

            # Check for property modifications
            common_props = incoming_prop_codes & current_prop_codes
            for prop_code in common_props:
                new_prop = incoming_props[prop_code]
                old_prop = current_props[prop_code]

                for key, new_value in new_prop.items():
                    old_value = old_prop.get(key)
                    if (
                        (key != "code" and key != "row_location")
                        and old_value is not None
                        and new_value != old_value
                    ):
                        log_message = (
                            f"Assigned property {prop_code} to entity type {entity_code} has changed its attribute {key} "
                            f"from '{old_value}' to '{new_value}' at row {incoming_props[prop_code].get('row_location')}."
                        )
                        store_log_message(
                            logger, incoming_entity, log_message, level="warning"
                        )

        # Check if assigned properties match another entity's properties
        for other_entity_code, other_entity in self.current_model.get(
            entity_type, {}
        ).items():
            if other_entity_code != entity_code:
                other_entity_properties = (
                    other_entity.get("properties", [])
                    if not is_terms
                    else other_entity.get("terms", [])
                )
                other_entity_props = {prop["code"] for prop in other_entity_properties}

                if (incoming_prop_codes == other_entity_props) and incoming_prop_codes:
                    log_message = (
                        "Entity will not be imported in openBIS. "
                        f"The entity {entity_code} at row {incoming_entity['defs'].get('row_location')} has the same properties defined as {other_entity_code}. "
                        "Maybe they are representing the same entity?"
                    )
                    store_log_message(
                        logger, incoming_entity, log_message, level="warning"
                    )

    def _extract_log_messages(self, model: dict, target_dict: dict) -> None:
        """
        Extracts and appends _log_msgs from the validated entities into an existing dictionary.

        Args:
            model (dict): The validated entity model.
            target_dict (dict): The dictionary where logs should be appended.
        """
        for entity_type, entities in model.items():
            if entity_type not in target_dict:
                target_dict[entity_type] = {}

            for entity_name, entity_data in entities.items():
                if "_log_msgs" in entity_data and entity_data["_log_msgs"]:
                    if entity_name not in target_dict[entity_type]:
                        target_dict[entity_type][entity_name] = {"_log_msgs": []}

                    # Append new messages to the existing ones
                    target_dict[entity_type][entity_name]["_log_msgs"].extend(
                        entity_data["_log_msgs"]
                    )

    def extract_property_codes(self, data):
        codes = set()

        # Check if the data contains 'properties' and extract 'code'
        if isinstance(data, dict):
            for key, value in data.items():
                # If the key is 'properties', collect all the 'code' values
                if key == "properties" and isinstance(value, list):
                    for property_item in value:
                        if "code" in property_item:
                            codes.add(property_item["code"])
                # Recursively check for more nested structures
                elif isinstance(value, dict | list):
                    codes.update(self.extract_property_codes(value))

        elif isinstance(data, list):
            for item in data:
                codes.update(self.extract_property_codes(item))

        return codes

new_entities = new_entities

current_model = current_model

validation_rules = validation_rules

logger = logger

log_msgs = []

validation_results = {}

__init__(new_entities, current_model, validation_rules)

Initialize the validator with new and current entity data.

PARAMETER DESCRIPTION
new_entities

The incoming datamodel.

TYPE: dict

current_model

The existing datamodel.

TYPE: dict

validation_rules

The validation rules to apply.

TYPE: dict

Source code in bam_masterdata/checker/masterdata_validator.py
def __init__(self, new_entities: dict, current_model: dict, validation_rules: dict):
    """
    Initialize the validator with new and current entity data.

    Args:
        new_entities (dict): The incoming datamodel.
        current_model (dict): The existing datamodel.
        validation_rules (dict): The validation rules to apply.
    """
    self.new_entities = new_entities
    self.current_model = current_model
    self.validation_rules = validation_rules
    self.logger = logger
    self.log_msgs: list = []
    self.validation_results: dict = {}

validate(mode='all')

Run validations based on mode: - "self": Validate current model structure and format. - "incoming": Validate new entities structure and format. - "validate": Validate both current and incoming models but do not compare. - "compare": Validate new entities against the current model. - "all": Run both. - "individual": Validate new entities and compare them with the current model.

Returns: dict: Validation results.

Source code in bam_masterdata/checker/masterdata_validator.py
def validate(self, mode: str = "all") -> dict:
    """
    Run validations based on mode:
    - "self": Validate current model structure and format.
    - "incoming": Validate new entities structure and format.
    - "validate": Validate both current and incoming models but do not compare.
    - "compare": Validate new entities against the current model.
    - "all": Run both.
    - "individual": Validate new entities and compare them with the current model.

     Returns:
        dict: Validation results.
    """
    self.logger.debug("Starting validation process...", mode=mode)

    # Reset validation results before running checks
    self.validation_results = {
        "current_model": {},
        "incoming_model": {},
        "comparisons": {},
    }

    if mode in ["self", "all", "validate"]:
        self.logger.debug("Validating current model...")
        self._validate_model(self.current_model)
        self._extract_log_messages(
            self.current_model, self.validation_results["current_model"]
        )

    if mode in ["incoming", "all", "validate"]:
        self.logger.debug("Validating new entities...")
        self._validate_model(self.new_entities)
        self._extract_log_messages(
            self.new_entities, self.validation_results["incoming_model"]
        )

    if mode in ["compare", "all"]:
        self.logger.debug("Comparing new entities with current model...")
        self._compare_with_current_model(mode=mode)
        self._extract_log_messages(
            self.new_entities, self.validation_results["comparisons"]
        )

    if mode == "individual":
        self.logger.debug(
            "Validating new entities and comparing them with current model..."
        )
        self.validation_results = {
            "incoming_model": {},
            "comparisons": {},
        }
        self._validate_model(self.new_entities)
        self._extract_log_messages(
            self.new_entities, self.validation_results["incoming_model"]
        )
        self._compare_with_current_model(mode="individual")
        self._extract_log_messages(
            self.new_entities, self.validation_results["comparisons"]
        )

    return self.validation_results

extract_property_codes(data)

Source code in bam_masterdata/checker/masterdata_validator.py
def extract_property_codes(self, data):
    codes = set()

    # Check if the data contains 'properties' and extract 'code'
    if isinstance(data, dict):
        for key, value in data.items():
            # If the key is 'properties', collect all the 'code' values
            if key == "properties" and isinstance(value, list):
                for property_item in value:
                    if "code" in property_item:
                        codes.add(property_item["code"])
            # Recursively check for more nested structures
            elif isinstance(value, dict | list):
                codes.update(self.extract_property_codes(value))

    elif isinstance(data, list):
        for item in data:
            codes.update(self.extract_property_codes(item))

    return codes

bam_masterdata.checker.source_loader

SourceLoader

Load the entities from a source written in different formats (Python classes, Excel, etc.) as defined in the source_path into a dictionary.

Source code in bam_masterdata/checker/source_loader.py
class SourceLoader:
    """
    Load the entities from a source written in different formats (Python classes, Excel, etc.) as defined
    in the `source_path` into a dictionary.
    """

    def __init__(self, source_path: str, **kwargs):
        self.source_path = source_path
        self.logger = kwargs.get("logger", logger)
        self.row_cell_info = kwargs.get("row_cell_info", True)
        # Check if the path is a single .py file OR a directory containing .py files
        if self.source_path.endswith(".py") or (
            os.path.isdir(self.source_path)
            and any(glob.glob(os.path.join(self.source_path, "*.py")))
        ):
            self.source_type = "python"
        elif self.source_path.endswith(".xlsx"):
            self.source_type = "excel"
        else:
            self.source_type = None
            self.logger.warning(f"Unsupported source type for path: {source_path}")

    def load(self) -> dict:
        """
        Load entities from the source path into a dictionary.

        Returns:
            dict: A dictionary containing the entities.
        """
        self.logger.info(f"Source type: {self.source_type}")
        if self.source_type == "python":
            return convert_enums(
                EntitiesDict(python_path=self.source_path).single_json()
            )
        elif self.source_type == "excel":
            return self.entities_to_json()
        else:
            raise NotImplementedError(f"Source type {self.source_type} not supported.")

    def entities_to_json(self) -> dict:
        """
        Transforms the dictionary of entities returned by the Excel extractor into a dictionary in JSON format for later check.

        Returns:
            dict: A dictionary containing the transformed entities.
        """

        excel_entities = MasterdataExcelExtractor(
            excel_path=self.source_path, row_cell_info=self.row_cell_info
        ).excel_to_entities()

        transformed_data = {}

        for entity_type, entities in excel_entities.items():
            transformed_data[entity_type] = {}

            for entity_name, entity_data in entities.items():
                if entity_type == "vocabulary_types":
                    transformed_entity = {
                        "terms": [],  # Now placed before "defs"
                        "defs": {  # Metadata moved to the end
                            "code": entity_data.get("code"),
                            "description": entity_data.get("description", ""),
                            "id": format_json_id(
                                entity_name
                            ),  # PascalCase for entity ID
                            "row_location": entity_data.get("row_location"),
                            "url_template": entity_data.get("url_template") or None,
                        },
                    }
                else:
                    transformed_entity = {
                        "properties": [],  # Now placed before "defs"
                        "defs": {  # Metadata moved to the end
                            "code": entity_data.get("code"),
                            "description": entity_data.get("description", ""),
                            "id": format_json_id(
                                entity_name
                            ),  # PascalCase for entity ID
                            "row_location": entity_data.get("row_location"),
                            "validation_script": entity_data.get("validationPlugin")
                            or None,  # Convert "" to None
                            "iri": entity_data.get("iri") or None,  # Convert "" to None
                        },
                    }

                # Handle additional fields specific to dataset_types
                if entity_type == "dataset_types":
                    transformed_entity["defs"]["main_dataset_pattern"] = (
                        entity_data.get("main_dataset_pattern")
                    )
                    transformed_entity["defs"]["main_dataset_path"] = entity_data.get(
                        "main_dataset_path"
                    )

                # Handle additional fields specific to object_types
                if entity_type == "object_types":
                    transformed_entity["defs"]["generated_code_prefix"] = (
                        entity_data.get("generatedCodePrefix")
                    )
                    transformed_entity["defs"]["auto_generate_codes"] = entity_data.get(
                        "autoGeneratedCode"
                    )

                # Convert properties from dict to list
                if "properties" in entity_data:
                    for prop_name, prop_data in entity_data["properties"].items():
                        transformed_property = {
                            "code": prop_data.get("code"),
                            "description": prop_data.get("description", ""),
                            "id": format_json_id(
                                prop_name
                            ),  # Now correctly formatted to PascalCase
                            "row_location": prop_data.get("row_location"),
                            "iri": prop_data.get("iri") or None,  # Convert "" to None
                            "property_label": prop_data.get("label"),
                            "data_type": prop_data.get("dataType"),
                            "vocabulary_code": prop_data.get("vocabularyCode")
                            or None,  # Convert "" to None
                            "object_code": None,
                            "metadata": None,
                            "dynamic_script": None,
                            "mandatory": prop_data.get("mandatory", False),
                            "show_in_edit_views": prop_data.get(
                                "show_in_edit_views", False
                            ),
                            "section": prop_data.get("section", ""),
                            "unique": None,
                            "internal_assignment": None,
                        }
                        transformed_entity["properties"].append(transformed_property)

                if "terms" in entity_data:
                    for term_name, term_data in entity_data["terms"].items():
                        transformed_term = {
                            "code": term_data.get("code"),
                            "description": term_data.get("description", ""),
                            "id": format_json_id(
                                term_name
                            ),  # Now correctly formatted to PascalCase
                            "row_location": term_data.get("row_location"),
                            "url_template": term_data.get("url_template")
                            or None,  # Convert "" to None
                            "label": term_data.get("label"),
                            "official": term_data.get("official"),
                        }
                        transformed_entity["terms"].append(transformed_term)

                transformed_data[entity_type][entity_name] = transformed_entity

        return transformed_data

source_path = source_path

logger = kwargs.get('logger', logger)

row_cell_info = kwargs.get('row_cell_info', True)

source_type = 'python'

__init__(source_path, **kwargs)

Source code in bam_masterdata/checker/source_loader.py
def __init__(self, source_path: str, **kwargs):
    self.source_path = source_path
    self.logger = kwargs.get("logger", logger)
    self.row_cell_info = kwargs.get("row_cell_info", True)
    # Check if the path is a single .py file OR a directory containing .py files
    if self.source_path.endswith(".py") or (
        os.path.isdir(self.source_path)
        and any(glob.glob(os.path.join(self.source_path, "*.py")))
    ):
        self.source_type = "python"
    elif self.source_path.endswith(".xlsx"):
        self.source_type = "excel"
    else:
        self.source_type = None
        self.logger.warning(f"Unsupported source type for path: {source_path}")

load()

Load entities from the source path into a dictionary.

RETURNS DESCRIPTION
dict

A dictionary containing the entities.

TYPE: dict

Source code in bam_masterdata/checker/source_loader.py
def load(self) -> dict:
    """
    Load entities from the source path into a dictionary.

    Returns:
        dict: A dictionary containing the entities.
    """
    self.logger.info(f"Source type: {self.source_type}")
    if self.source_type == "python":
        return convert_enums(
            EntitiesDict(python_path=self.source_path).single_json()
        )
    elif self.source_type == "excel":
        return self.entities_to_json()
    else:
        raise NotImplementedError(f"Source type {self.source_type} not supported.")

entities_to_json()

Transforms the dictionary of entities returned by the Excel extractor into a dictionary in JSON format for later check.

RETURNS DESCRIPTION
dict

A dictionary containing the transformed entities.

TYPE: dict

Source code in bam_masterdata/checker/source_loader.py
def entities_to_json(self) -> dict:
    """
    Transforms the dictionary of entities returned by the Excel extractor into a dictionary in JSON format for later check.

    Returns:
        dict: A dictionary containing the transformed entities.
    """

    excel_entities = MasterdataExcelExtractor(
        excel_path=self.source_path, row_cell_info=self.row_cell_info
    ).excel_to_entities()

    transformed_data = {}

    for entity_type, entities in excel_entities.items():
        transformed_data[entity_type] = {}

        for entity_name, entity_data in entities.items():
            if entity_type == "vocabulary_types":
                transformed_entity = {
                    "terms": [],  # Now placed before "defs"
                    "defs": {  # Metadata moved to the end
                        "code": entity_data.get("code"),
                        "description": entity_data.get("description", ""),
                        "id": format_json_id(
                            entity_name
                        ),  # PascalCase for entity ID
                        "row_location": entity_data.get("row_location"),
                        "url_template": entity_data.get("url_template") or None,
                    },
                }
            else:
                transformed_entity = {
                    "properties": [],  # Now placed before "defs"
                    "defs": {  # Metadata moved to the end
                        "code": entity_data.get("code"),
                        "description": entity_data.get("description", ""),
                        "id": format_json_id(
                            entity_name
                        ),  # PascalCase for entity ID
                        "row_location": entity_data.get("row_location"),
                        "validation_script": entity_data.get("validationPlugin")
                        or None,  # Convert "" to None
                        "iri": entity_data.get("iri") or None,  # Convert "" to None
                    },
                }

            # Handle additional fields specific to dataset_types
            if entity_type == "dataset_types":
                transformed_entity["defs"]["main_dataset_pattern"] = (
                    entity_data.get("main_dataset_pattern")
                )
                transformed_entity["defs"]["main_dataset_path"] = entity_data.get(
                    "main_dataset_path"
                )

            # Handle additional fields specific to object_types
            if entity_type == "object_types":
                transformed_entity["defs"]["generated_code_prefix"] = (
                    entity_data.get("generatedCodePrefix")
                )
                transformed_entity["defs"]["auto_generate_codes"] = entity_data.get(
                    "autoGeneratedCode"
                )

            # Convert properties from dict to list
            if "properties" in entity_data:
                for prop_name, prop_data in entity_data["properties"].items():
                    transformed_property = {
                        "code": prop_data.get("code"),
                        "description": prop_data.get("description", ""),
                        "id": format_json_id(
                            prop_name
                        ),  # Now correctly formatted to PascalCase
                        "row_location": prop_data.get("row_location"),
                        "iri": prop_data.get("iri") or None,  # Convert "" to None
                        "property_label": prop_data.get("label"),
                        "data_type": prop_data.get("dataType"),
                        "vocabulary_code": prop_data.get("vocabularyCode")
                        or None,  # Convert "" to None
                        "object_code": None,
                        "metadata": None,
                        "dynamic_script": None,
                        "mandatory": prop_data.get("mandatory", False),
                        "show_in_edit_views": prop_data.get(
                            "show_in_edit_views", False
                        ),
                        "section": prop_data.get("section", ""),
                        "unique": None,
                        "internal_assignment": None,
                    }
                    transformed_entity["properties"].append(transformed_property)

            if "terms" in entity_data:
                for term_name, term_data in entity_data["terms"].items():
                    transformed_term = {
                        "code": term_data.get("code"),
                        "description": term_data.get("description", ""),
                        "id": format_json_id(
                            term_name
                        ),  # Now correctly formatted to PascalCase
                        "row_location": term_data.get("row_location"),
                        "url_template": term_data.get("url_template")
                        or None,  # Convert "" to None
                        "label": term_data.get("label"),
                        "official": term_data.get("official"),
                    }
                    transformed_entity["terms"].append(transformed_term)

            transformed_data[entity_type][entity_name] = transformed_entity

    return transformed_data

bam_masterdata.parsing.parsing

AbstractParser

Bases: ABC

Example Abstract base class for parsers. Each parser should inherit from this class and implement the parse() method to populate collection.

Source code in bam_masterdata/parsing/parsing.py
class AbstractParser(ABC):
    """
    Example Abstract base class for parsers. Each parser should inherit from this class and implement
    the `parse()` method to populate `collection`.
    """

    @abstractmethod
    def parse(
        self,
        files: list[str],
        collection: CollectionType,
        logger: "BoundLoggerLazyProxy",
    ) -> None:
        """
        Parse the input `files` and populate the provided `collection` with object types, their metadata,
        and their relationships.

        Args:
            files (list[str]): List of file paths to be parsed.
            collection (CollectionType): Collection to be populated with parsed data.
            logger (BoundLoggerLazyProxy): Logger for logging messages during parsing.
        """
        pass

parse(files, collection, logger)

Parse the input files and populate the provided collection with object types, their metadata, and their relationships.

PARAMETER DESCRIPTION
files

List of file paths to be parsed.

TYPE: list[str]

collection

Collection to be populated with parsed data.

TYPE: CollectionType

logger

Logger for logging messages during parsing.

TYPE: BoundLoggerLazyProxy

Source code in bam_masterdata/parsing/parsing.py
@abstractmethod
def parse(
    self,
    files: list[str],
    collection: CollectionType,
    logger: "BoundLoggerLazyProxy",
) -> None:
    """
    Parse the input `files` and populate the provided `collection` with object types, their metadata,
    and their relationships.

    Args:
        files (list[str]): List of file paths to be parsed.
        collection (CollectionType): Collection to be populated with parsed data.
        logger (BoundLoggerLazyProxy): Logger for logging messages during parsing.
    """
    pass

bam_masterdata.utils.utils

delete_and_create_dir(directory_path, logger=logger, force_delete=False)

Deletes the directory at directory_path and creates a new one in the same path.

PARAMETER DESCRIPTION
directory_path

The directory path to delete and create the folder.

TYPE: str

logger

The logger to log messages. Default is logger.

TYPE: BoundLoggerLazyProxy DEFAULT: logger

force_delete

If True, the directory will be forcibly deleted if it exists.

TYPE: bool DEFAULT: False

Source code in bam_masterdata/utils/utils.py
def delete_and_create_dir(
    directory_path: str,
    logger: "BoundLoggerLazyProxy" = logger,
    force_delete: bool = False,
) -> None:
    """
    Deletes the directory at `directory_path` and creates a new one in the same path.

    Args:
        directory_path (str): The directory path to delete and create the folder.
        logger (BoundLoggerLazyProxy): The logger to log messages. Default is `logger`.
        force_delete (bool): If True, the directory will be forcibly deleted if it exists.
    """
    if not directory_path:
        logger.warning(
            "The `directory_path` is empty. Please, provide a proper input to the function."
        )
        return None

    if not force_delete:
        logger.info(f"Skipping the deletion of the directory at {directory_path}.")
        if not os.path.exists(directory_path):
            os.makedirs(directory_path)
        return None

    if os.path.exists(directory_path):
        try:
            shutil.rmtree(directory_path)  # ! careful with this line
        except PermissionError:
            logger.error(
                f"Permission denied to delete the directory at {directory_path}."
            )
            return None
    os.makedirs(directory_path)

listdir_py_modules(directory_path, logger=logger)

Recursively goes through the directory_path and returns a list of all .py files that do not start with '_'. If directory_path is a single Python module file, it will return a list with that file.

PARAMETER DESCRIPTION
directory_path

The directory path to search through.

TYPE: str

logger

The logger to log messages. Default is logger.

TYPE: BoundLoggerLazyProxy DEFAULT: logger

RETURNS DESCRIPTION
list[str]

list[str]: A list of all .py files that do not start with '_'

Source code in bam_masterdata/utils/utils.py
def listdir_py_modules(
    directory_path: str, logger: "BoundLoggerLazyProxy" = logger
) -> list[str]:
    """
    Recursively goes through the `directory_path` and returns a list of all .py files that do not start with '_'. If
    `directory_path` is a single Python module file, it will return a list with that file.

    Args:
        directory_path (str): The directory path to search through.
        logger (BoundLoggerLazyProxy): The logger to log messages. Default is `logger`.

    Returns:
        list[str]: A list of all .py files that do not start with '_'
    """
    if not directory_path:
        logger.warning(
            "The `directory_path` is empty. Please, provide a proper input to the function."
        )
        return []

    # In case of a individual Python module file
    if directory_path.endswith(".py"):
        return [directory_path]
    # Use glob to find all .py files recursively in a directory containing all modules
    else:
        files = glob.glob(os.path.join(directory_path, "**", "*.py"), recursive=True)
    if not files:
        logger.info("No Python files found in the directory.")
        return []

    # Filter out files that start with '_'
    # ! sorted in order to avoid using with OS sorting differently
    return sorted(
        [
            f
            for f in files
            if not os.path.basename(f).startswith("_") and "tmp" not in f.split(os.sep)
        ]
    )

import_module(module_path)

Dynamically imports a module from the given file path.

PARAMETER DESCRIPTION
module_path

Path to the Python module file.

TYPE: str

RETURNS DESCRIPTION
module

Imported module object.

TYPE: Any

Source code in bam_masterdata/utils/utils.py
def import_module(module_path: str) -> Any:
    """
    Dynamically imports a module from the given file path.

    Args:
        module_path (str): Path to the Python module file.

    Returns:
        module: Imported module object.
    """
    module_name = os.path.splitext(os.path.basename(module_path))[0]
    spec = importlib.util.spec_from_file_location(module_name, module_path)
    module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(module)
    return module

code_to_class_name(code, logger=logger, entity_type='object')

Converts an openBIS code to a class name by capitalizing each word and removing special characters. In the special case the entity is a property type, it retains the full name separated by points instead of only keeping the last name (e.g., "TEM.INSTRUMENT" -> "TemInstrument" instead of "Instrument").

PARAMETER DESCRIPTION
code

The openBIS code to convert to a class name.

TYPE: str

logger

The logger to log messages. Default is logger.

TYPE: BoundLoggerLazyProxy DEFAULT: logger

entity_type

The type of entity to convert. Default is "object".

TYPE: str DEFAULT: 'object'

Returns: str: The class name derived from the openBIS code.

Source code in bam_masterdata/utils/utils.py
def code_to_class_name(
    code: str | None,
    logger: "BoundLoggerLazyProxy" = logger,
    entity_type: str = "object",
) -> str:
    """
    Converts an openBIS `code` to a class name by capitalizing each word and removing special characters. In
    the special case the entity is a property type, it retains the full name separated by points instead of
    only keeping the last name (e.g., "TEM.INSTRUMENT" -> "TemInstrument" instead of "Instrument").

    Args:
        code (str): The openBIS code to convert to a class name.
        logger (BoundLoggerLazyProxy): The logger to log messages. Default is `logger`.
        entity_type (str): The type of entity to convert. Default is "object".
    Returns:
        str: The class name derived from the openBIS code.
    """
    if not code:
        logger.error(
            "The `code` is empty. Please, provide a proper input to the function."
        )
        return ""

    if entity_type == "property":
        code_names = chain.from_iterable(
            [c.split("_") for c in code.lstrip("$").split(".")]
        )
        return "".join(c.capitalize() for c in code_names)
    return "".join(c.capitalize() for c in code.lstrip("$").rsplit(".")[-1].split("_"))

load_validation_rules(logger, file_path=os.path.join(VALIDATION_RULES_DIR, 'validation_rules.json'))

Source code in bam_masterdata/utils/utils.py
def load_validation_rules(
    logger: "BoundLoggerLazyProxy",
    file_path: str = os.path.join(VALIDATION_RULES_DIR, "validation_rules.json"),
):
    if not os.path.exists(file_path):
        logger.error(f"Validation rules file not found: {file_path}")
        raise FileNotFoundError(f"Validation rules file not found: {file_path}")

    try:
        with open(file_path, encoding="utf-8") as file:
            validation_rules = json.load(file)

        logger.info("Validation rules successfully loaded.")

        return validation_rules

    except json.JSONDecodeError as e:
        logger.error(f"Error parsing validation rules JSON: {e}")
        raise ValueError(f"Error parsing validation rules JSON: {e}")

duplicated_property_types(module_path, logger)

Find the duplicated property types in a module specified by module_path and returns a dictionary containing the duplicated property types class names as keys and the lines where they matched as values.

PARAMETER DESCRIPTION
module_path

The path to the module containing the property types.

TYPE: str

logger

The logger to log messages.

TYPE: BoundLoggerLazyProxy

RETURNS DESCRIPTION
dict

A dictionary containing the duplicated property types class names as keys and the

TYPE: dict

dict

lines where they matched as values.

Source code in bam_masterdata/utils/utils.py
def duplicated_property_types(module_path: str, logger: "BoundLoggerLazyProxy") -> dict:
    """
    Find the duplicated property types in a module specified by `module_path` and returns a dictionary
    containing the duplicated property types class names as keys and the lines where they matched as values.

    Args:
        module_path (str): The path to the module containing the property types.
        logger (BoundLoggerLazyProxy): The logger to log messages.

    Returns:
        dict: A dictionary containing the duplicated property types class names as keys and the
        lines where they matched as values.
    """
    duplicated_props: dict = {}
    module = import_module(module_path=module_path)
    source_code = inspect.getsource(module)
    for name, _ in inspect.getmembers(module):
        if name.startswith("_") or name == "PropertyTypeDef":
            continue

        pattern = rf"^\s*{name} *= *PropertyTypeDef"

        # Find all matching line numbers
        matches = [
            i + 1  # Convert to 1-based index
            for i, line in enumerate(source_code.splitlines())
            if re.match(pattern, line)
        ]
        if len(matches) > 1:
            duplicated_props[name] = matches
    if duplicated_props:
        logger.critical(
            f"Found {len(duplicated_props)} duplicated property types. These are stored in a dictionary "
            f"where the keys are the names of the variables in property_types.py and the values are the lines in the module: {duplicated_props}"
        )
    return duplicated_props

format_json_id(value)

Converts snake_case or UPPER_CASE to PascalCase while keeping special cases like '$NAME' untouched.

Source code in bam_masterdata/utils/utils.py
def format_json_id(value):
    """Converts snake_case or UPPER_CASE to PascalCase while keeping special cases like '$NAME' untouched."""
    if value.startswith("$"):
        # Remove "$" and apply PascalCase transformation
        value = value[1:]
    return "".join(
        word.capitalize() for word in re.split(r"[\._]", value)
    )  # PascalCase

convert_enums(obj)

Source code in bam_masterdata/utils/utils.py
def convert_enums(obj):
    if isinstance(obj, dict):
        return {k: convert_enums(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_enums(i) for i in obj]
    elif isinstance(obj, Enum):  # Convert Enum to string
        return obj.value
    return obj

is_reduced_version(generated_code_value, code)

Check if generated_code_value is a reduced version of code.

PARAMETER DESCRIPTION
generated_code_value

The potentially reduced code.

TYPE: str

code

The original full code.

TYPE: str

RETURNS DESCRIPTION
bool

True if generated_code_value is a reduced version of code, False otherwise.

TYPE: bool

Source code in bam_masterdata/utils/utils.py
def is_reduced_version(generated_code_value: str, code: str) -> bool:
    """
    Check if generated_code_value is a reduced version of code.

    Args:
        generated_code_value (str): The potentially reduced code.
        code (str): The original full code.

    Returns:
        bool: True if generated_code_value is a reduced version of code, False otherwise.
    """
    if generated_code_value == "" or code == "":
        return False

    if code.startswith(generated_code_value):
        return True

    # Check if both are single words (no delimiters)
    if not any(delimiter in code for delimiter in "._") and not any(
        delimiter in generated_code_value for delimiter in "._"
    ):
        return True

    # Determine the delimiter in each string
    code_delimiter = "." if "." in code else "_" if "_" in code else None
    generated_delimiter = (
        "."
        if "." in generated_code_value
        else "_"
        if "_" in generated_code_value
        else None
    )

    # If delimiters don't match, return False
    if code_delimiter != generated_delimiter:
        return False

    # Split both strings using the determined delimiter
    generated_parts = generated_code_value.split(code_delimiter)
    original_parts = code.split(code_delimiter)

    # Ensure both have the same number of parts
    return len(generated_parts) == len(original_parts)

store_log_message(logger, entity_ref, message, level='error')

Logs a message and stores it inside the entity's _log_msgs list.

PARAMETER DESCRIPTION
entity_ref

The entity dictionary where _log_msgs should be stored.

TYPE: dict

message

The log message.

TYPE: str

level

Log level ('error', 'warning', 'critical', 'info').

TYPE: str DEFAULT: 'error'

Source code in bam_masterdata/utils/utils.py
def store_log_message(logger, entity_ref, message, level="error"):
    """
    Logs a message and stores it inside the entity's _log_msgs list.

    Args:
        entity_ref (dict): The entity dictionary where _log_msgs should be stored.
        message (str): The log message.
        level (str): Log level ('error', 'warning', 'critical', 'info').
    """
    log_function = {
        "error": logger.error,
        "warning": logger.warning,
        "critical": logger.critical,
        "info": logger.info,
    }.get(level, logger.error)

    # Log the message
    log_function(message)

    # Ensure _log_msgs exists
    if "_log_msgs" not in entity_ref:
        entity_ref["_log_msgs"] = []

    # Append log message
    entity_ref["_log_msgs"].append((level, message))

bam_masterdata.utils.paths

DIRECTORIES = {'datamodel': [Path.cwd() / 'datamodel', Path.cwd() / 'bam_masterdata' / 'datamodel', Path(__file__).parent.parent / 'datamodel'], 'validation_rules_checker': [Path.cwd() / 'bam_masterdata' / 'checker' / 'validation_rules', Path(__file__).parent.parent / 'checker' / 'validation_rules']}

DATAMODEL_DIR = find_dir(possible_locations=(DIRECTORIES['datamodel']))

VALIDATION_RULES_DIR = find_dir(possible_locations=(DIRECTORIES['validation_rules_checker']))

find_dir(possible_locations)

Search for a valid directory in a list of possible locations.

PARAMETER DESCRIPTION
possible_locations

A list of possible locations to search for a directory.

TYPE: list[Path]

RAISES DESCRIPTION
FileNotFoundError

If no valid directory is found.

RETURNS DESCRIPTION
str

The path of the valid directory.

TYPE: str

Source code in bam_masterdata/utils/paths.py
def find_dir(possible_locations: list[Path]) -> str:
    """
    Search for a valid directory in a list of possible locations.

    Args:
        possible_locations (list[Path]): A list of possible locations to search for a directory.

    Raises:
        FileNotFoundError: If no valid directory is found.

    Returns:
        str: The path of the valid directory.
    """
    for path in possible_locations:
        if path.exists():
            return str(path.resolve())

    raise FileNotFoundError("Could not find a valid directory.")