Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Restrict DuckDB dependency to < 1.4.0 (#972)
- Fixed schema evolution support for optional fields in CSV and Parquet formats. Optional fields marked with `required: false` are no longer incorrectly treated as required during validation, enabling proper schema evolution where optional fields can be added to contracts without breaking validation of historical data files (#977)
- Fixed decimals in pydantic model export. Fields marked with `type: decimal` will be mapped to `decimal.Decimal` instead of `float`.
- Fixed example(s) field mapping for Data Contract Specification importer (#992).

## [0.11.2] - 2025-12-15

Expand Down
91 changes: 67 additions & 24 deletions datacontract/imports/dcs_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
class DcsImporter(Importer):
"""Importer for Data Contract Specification (DCS) format."""

def import_source(
self, source: str, import_args: dict
) -> OpenDataContractStandard:
def import_source(self, source: str, import_args: dict) -> OpenDataContractStandard:
import yaml

from datacontract.lint.resources import read_resource
Expand Down Expand Up @@ -99,6 +97,7 @@ def convert_dcs_to_odcs(dcs: DataContractSpecification) -> OpenDataContractStand
# Convert links to authoritativeDefinitions
if dcs.links:
from open_data_contract_standard.model import AuthoritativeDefinition

odcs.authoritativeDefinitions = [
AuthoritativeDefinition(type=key, url=value) for key, value in dcs.links.items()
]
Expand All @@ -114,8 +113,11 @@ def convert_dcs_to_odcs(dcs: DataContractSpecification) -> OpenDataContractStand
# Convert policies to authoritativeDefinitions
if dcs.terms.policies:
from open_data_contract_standard.model import AuthoritativeDefinition

policy_defs = [
AuthoritativeDefinition(type=p.name, description=getattr(p, "description", None), url=getattr(p, "url", None))
AuthoritativeDefinition(
type=p.name, description=getattr(p, "description", None), url=getattr(p, "url", None)
)
for p in dcs.terms.policies
]
if odcs.authoritativeDefinitions:
Expand Down Expand Up @@ -176,6 +178,7 @@ def _convert_servers(dcs_servers: Dict[str, DCSServer]) -> List[ODCSServer]:
odcs_server.description = dcs_server.description
if dcs_server.roles:
from open_data_contract_standard.model import Role as ODCSRole

odcs_server.roles = [ODCSRole(role=r.name, description=r.description) for r in dcs_server.roles]
if dcs_server.topic:
# Store topic in customProperties since ODCS Server doesn't have a topic field
Expand Down Expand Up @@ -217,14 +220,14 @@ def _convert_models_to_schema(models: Dict[str, Model], definitions: Dict[str, F
schema_obj.physicalName = physical_name

# Store namespace in customProperties for Avro export
if hasattr(model, 'namespace') and model.namespace:
if hasattr(model, "namespace") and model.namespace:
if schema_obj.customProperties is None:
schema_obj.customProperties = []
schema_obj.customProperties.append(CustomProperty(property="namespace", value=model.namespace))

# Convert fields to properties
# Pass model-level primaryKey list to set primaryKey and primaryKeyPosition on fields
model_primary_keys = model.primaryKey if hasattr(model, 'primaryKey') and model.primaryKey else []
model_primary_keys = model.primaryKey if hasattr(model, "primaryKey") and model.primaryKey else []
if model.fields:
schema_obj.properties = _convert_fields_to_properties(model.fields, model_primary_keys, definitions)

Expand Down Expand Up @@ -307,7 +310,7 @@ def _resolve_field_ref(field: Field, definitions: Dict[str, Field]) -> Field:
elif resolved_value is not None:
merged_data[attr] = resolved_value
# Clear ref to avoid infinite recursion
merged_data['ref'] = None
merged_data["ref"] = None
return Field(**merged_data)


Expand Down Expand Up @@ -355,7 +358,7 @@ def _resolve_local_ref(ref_path: str, definitions: Dict[str, Field]) -> Optional
return None

# Remove the #/definitions/ prefix
path_after_definitions = ref_path[len("#/definitions/"):]
path_after_definitions = ref_path[len("#/definitions/") :]

# Check for simple case: #/definitions/name
if "/" not in path_after_definitions:
Expand Down Expand Up @@ -439,7 +442,10 @@ def _convert_field_to_property(
prop.classification = field.classification
if field.tags:
prop.tags = field.tags

if field.examples is not None:
prop.examples = field.examples
if field.example is not None:
prop.examples = [field.example]
# Convert constraints to logicalTypeOptions
logical_type_options = {}
if field.minLength is not None:
Expand Down Expand Up @@ -483,7 +489,16 @@ def _convert_field_to_property(
custom_properties.append(CustomProperty(property="scale", value=str(field.scale)))
if field.config:
# Server-specific type overrides physicalType
server_type_keys = ["oracleType", "snowflakeType", "postgresType", "bigqueryType", "databricksType", "sqlserverType", "trinoType", "physicalType"]
server_type_keys = [
"oracleType",
"snowflakeType",
"postgresType",
"bigqueryType",
"databricksType",
"sqlserverType",
"trinoType",
"physicalType",
]
for key in server_type_keys:
if key in field.config:
prop.physicalType = field.config[key]
Expand Down Expand Up @@ -511,9 +526,13 @@ def _convert_field_to_property(
# Convert keys/values (for map types) - store types in customProperties
if field.keys or field.values:
if field.keys and field.keys.type:
custom_properties.append(CustomProperty(property="mapKeyType", value=_convert_type_to_logical_type(field.keys.type)))
custom_properties.append(
CustomProperty(property="mapKeyType", value=_convert_type_to_logical_type(field.keys.type))
)
if field.values and field.values.type:
custom_properties.append(CustomProperty(property="mapValueType", value=_convert_type_to_logical_type(field.values.type)))
custom_properties.append(
CustomProperty(property="mapValueType", value=_convert_type_to_logical_type(field.values.type))
)
# For map with struct values, store the value fields in properties
if field.values.fields:
prop.properties = _convert_fields_to_properties(field.values.fields, None, definitions)
Expand All @@ -530,11 +549,16 @@ def _convert_field_to_property(

# Convert lineage
if field.lineage:
if hasattr(field.lineage, 'inputFields') and field.lineage.inputFields:
prop.transformSourceObjects = [f"{f.namespace}.{f.name}.{f.field}" if hasattr(f, 'namespace') and f.namespace else f"{f.name}.{f.field}" for f in field.lineage.inputFields]
if hasattr(field.lineage, 'transformationDescription') and field.lineage.transformationDescription:
if hasattr(field.lineage, "inputFields") and field.lineage.inputFields:
prop.transformSourceObjects = [
f"{f.namespace}.{f.name}.{f.field}"
if hasattr(f, "namespace") and f.namespace
else f"{f.name}.{f.field}"
for f in field.lineage.inputFields
]
if hasattr(field.lineage, "transformationDescription") and field.lineage.transformationDescription:
prop.transformDescription = field.lineage.transformationDescription
if hasattr(field.lineage, 'transformationType') and field.lineage.transformationType:
if hasattr(field.lineage, "transformationType") and field.lineage.transformationType:
prop.transformLogic = field.lineage.transformationType

return prop
Expand Down Expand Up @@ -568,16 +592,16 @@ def _convert_type_to_logical_type(dcs_type: str) -> str:
"timestamp_tz": "timestamp",
"timestamp_ntz": "timestamp",
"date": "date",
"time": "string", # not supported in ODCS
"time": "string", # not supported in ODCS
"datetime": "timestamp",
"array": "array",
"object": "object",
"record": "object",
"struct": "object",
"map": "object",
"bytes": "string", # not supported in ODCS
"bytes": "string", # not supported in ODCS
"binary": "string", # not supported in ODCS
"null": "string", # not supported in ODCS
"null": "string", # not supported in ODCS
}

return type_mapping.get(t, t)
Expand Down Expand Up @@ -637,7 +661,9 @@ def _convert_servicelevels(servicelevels: Any) -> List[ServiceLevelAgreementProp
sla_properties.append(
ServiceLevelAgreementProperty(
property="generalAvailability",
value=servicelevels.availability.description if hasattr(servicelevels.availability, "description") else str(servicelevels.availability),
value=servicelevels.availability.description
if hasattr(servicelevels.availability, "description")
else str(servicelevels.availability),
)
)

Expand All @@ -655,7 +681,12 @@ def _convert_servicelevels(servicelevels: Any) -> List[ServiceLevelAgreementProp

if hasattr(servicelevels, "freshness") and servicelevels.freshness:
freshness = servicelevels.freshness
if hasattr(freshness, "threshold") and freshness.threshold and hasattr(freshness, "timestampField") and freshness.timestampField:
if (
hasattr(freshness, "threshold")
and freshness.threshold
and hasattr(freshness, "timestampField")
and freshness.timestampField
):
value, unit = _parse_iso8601_duration(freshness.threshold)
if value is not None and unit is not None:
sla_properties.append(
Expand Down Expand Up @@ -686,7 +717,11 @@ def _convert_servicelevels(servicelevels: Any) -> List[ServiceLevelAgreementProp

if hasattr(servicelevels, "frequency") and servicelevels.frequency:
frequency = servicelevels.frequency
freq_value = frequency.interval if hasattr(frequency, "interval") and frequency.interval else (frequency.cron if hasattr(frequency, "cron") else None)
freq_value = (
frequency.interval
if hasattr(frequency, "interval") and frequency.interval
else (frequency.cron if hasattr(frequency, "cron") else None)
)
if freq_value:
sla_properties.append(
ServiceLevelAgreementProperty(
Expand All @@ -697,7 +732,11 @@ def _convert_servicelevels(servicelevels: Any) -> List[ServiceLevelAgreementProp

if hasattr(servicelevels, "support") and servicelevels.support:
support = servicelevels.support
support_value = support.time if hasattr(support, "time") and support.time else (support.description if hasattr(support, "description") else None)
support_value = (
support.time
if hasattr(support, "time") and support.time
else (support.description if hasattr(support, "description") else None)
)
if support_value:
sla_properties.append(
ServiceLevelAgreementProperty(
Expand All @@ -708,7 +747,11 @@ def _convert_servicelevels(servicelevels: Any) -> List[ServiceLevelAgreementProp

if hasattr(servicelevels, "backup") and servicelevels.backup:
backup = servicelevels.backup
backup_value = backup.interval if hasattr(backup, "interval") and backup.interval else (backup.cron if hasattr(backup, "cron") else None)
backup_value = (
backup.interval
if hasattr(backup, "interval") and backup.interval
else (backup.cron if hasattr(backup, "cron") else None)
)
if backup_value:
sla_properties.append(
ServiceLevelAgreementProperty(
Expand Down
10 changes: 5 additions & 5 deletions tests/fixtures/markdown/export/expected.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ Max data processing per day: 10 TiB

| Field | Type | Attributes |
| ----- | ---- | ---------- |
| order_id | string | *An internal ID that identifies an order in the online shop.*<br>• **businessName:** Order ID<br>• **tags:** ['orders']<br>• **customProperties:** [{'property': 'pii', 'value': 'True'}]<br>• `primaryKey`<br>• **logicalTypeOptions:** {'format': 'uuid'}<br>• `required`<br>• `unique`<br>• **classification:** restricted |
| order_timestamp | timestamp | *The business timestamp in UTC when the order was successfully registered in the source system and the payment was successful.*<br>• **tags:** ['business-timestamp']<br>• `required` |
| order_total | integer | *Total amount the smallest monetary unit (e.g., cents).*<br>• `required` |
| order_id | string | *An internal ID that identifies an order in the online shop.*<br>• **businessName:** Order ID<br>• **tags:** ['orders']<br>• **customProperties:** [{'property': 'pii', 'value': 'True'}]<br>• `primaryKey`<br>• **logicalTypeOptions:** {'format': 'uuid'}<br>• `required`<br>• `unique`<br>• **classification:** restricted<br>• **examples:** ['243c25e5-a081-43a9-aeab-6d5d5b6cb5e2'] |
| order_timestamp | timestamp | *The business timestamp in UTC when the order was successfully registered in the source system and the payment was successful.*<br>• **tags:** ['business-timestamp']<br>• `required`<br>• **examples:** ['2024-09-09T08:30:00Z'] |
| order_total | integer | *Total amount the smallest monetary unit (e.g., cents).*<br>• `required`<br>• **examples:** [9999] |
| customer_id | string | *Unique identifier for the customer.*<br>• **logicalTypeOptions:** {'minLength': 10, 'maxLength': 20} |
| customer_email_address | string | *The email address, as entered by the customer.*<br>• **customProperties:** [{'property': 'pii', 'value': 'True'}]<br>• **logicalTypeOptions:** {'format': 'email'}<br>• `required`<br>• **classification:** sensitive<br>• **transformSourceObjects:** ['com.example.service.checkout.checkout_db.orders.email_address']<br>• **quality:** [{'description': 'The email address is not verified and may be invalid.', 'type': 'text'}] |
| processed_timestamp | timestamp | *The timestamp when the record was processed by the data platform.*<br>• **customProperties:** [{'property': 'jsonType', 'value': 'string'}, {'property': 'jsonFormat', 'value': 'date-time'}]<br>• `required` |
Expand All @@ -47,8 +47,8 @@ Max data processing per day: 10 TiB
| Field | Type | Attributes |
| ----- | ---- | ---------- |
| line_item_id | string | *Primary key of the lines_item_id table*<br>• `primaryKey`<br>• **primaryKeyPosition:** 2<br>• `required` |
| order_id | string | *An internal ID that identifies an order in the online shop.*<br>• **businessName:** Order ID<br>• **tags:** ['orders']<br>• **customProperties:** [{'property': 'pii', 'value': 'True'}]<br>• `primaryKey`<br>• **primaryKeyPosition:** 1<br>• **logicalTypeOptions:** {'format': 'uuid'}<br>• **classification:** restricted<br>• **relationships:** [{'type': 'foreignKey', 'to': 'orders.order_id'}] |
| sku | string | *The purchased article number*<br>• **businessName:** Stock Keeping Unit<br>• **tags:** ['inventory']<br>• **logicalTypeOptions:** {'pattern': '^[A-Za-z0-9]{8,14}$'} |
| order_id | string | *An internal ID that identifies an order in the online shop.*<br>• **businessName:** Order ID<br>• **tags:** ['orders']<br>• **customProperties:** [{'property': 'pii', 'value': 'True'}]<br>• `primaryKey`<br>• **primaryKeyPosition:** 1<br>• **logicalTypeOptions:** {'format': 'uuid'}<br>• **classification:** restricted<br>• **examples:** ['243c25e5-a081-43a9-aeab-6d5d5b6cb5e2']<br>• **relationships:** [{'type': 'foreignKey', 'to': 'orders.order_id'}] |
| sku | string | *The purchased article number*<br>• **businessName:** Stock Keeping Unit<br>• **tags:** ['inventory']<br>• **logicalTypeOptions:** {'pattern': '^[A-Za-z0-9]{8,14}$'}<br>• **examples:** ['96385074'] |

## SLA Properties
| Property | Value | Unit |
Expand Down
3 changes: 3 additions & 0 deletions tests/test_export_rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,14 @@ def test_to_rdf_complex():
odcs:classification "restricted" ;
odcs:description "An internal ID that identifies an order in the online shop." ;
odcsx:businessName "Order ID" ;
odcsx:examples "243c25e5-a081-43a9-aeab-6d5d5b6cb5e2" ;
odcs:logicalType "string" ;
odcs:name "order_id" ;
odcs:physicalType "text" ],
[ a odcs:Property ;
odcs:description "The purchased article number" ;
odcsx:businessName "Stock Keeping Unit" ;
odcsx:examples "96385074" ;
odcs:logicalType "string" ;
odcs:name "sku" ;
odcs:physicalType "text" ],
Expand All @@ -132,6 +134,7 @@ def test_to_rdf_complex():
odcs:classification "restricted" ;
odcs:description "An internal ID that identifies an order in the online shop." ;
odcsx:businessName "Order ID" ;
odcsx:examples "243c25e5-a081-43a9-aeab-6d5d5b6cb5e2" ;
odcs:logicalType "string" ;
odcs:name "order_id" ;
odcs:physicalType "text" ;
Expand Down
Loading