diff --git a/src/onepasswordconnectsdk/config.py b/src/onepasswordconnectsdk/config.py index bcde7a0..2581c3c 100644 --- a/src/onepasswordconnectsdk/config.py +++ b/src/onepasswordconnectsdk/config.py @@ -205,6 +205,56 @@ def _vault_uuid_for_field(field: str, vault_tag: dict): ) +def _match_field(field, field_identifier, section_path, sections): + """Check if a field matches the given identifier and section. + + Args: + field: The item field to check. + field_identifier (str): The value to match against (label or id). + section_path (str): The section part of the field path. + sections (dict): Mapping of section labels/ids to section ids. + + Returns: + bool: True if the field matches the identifier and section. + """ + try: + section_id = field.section.id + except AttributeError: + section_id = None + + return ( + section_id is None + or (section_id == sections.get(section_path)) + or section_path in sections.values() + ) + + +def _find_field(item_fields, field_identifier, section_path, sections, match_attr): + """Search for a field by a given attribute (label or id). + + Args: + item_fields (list): The list of fields on the item. + field_identifier (str): The value to match against. + section_path (str): The section part of the field path. + sections (dict): Mapping of section labels/ids to section ids. + match_attr (str): The field attribute to compare against + (e.g. "label" or "id"). + + Returns: + Field or None: The matching field, or None if not found. + """ + for field in item_fields: + if getattr(field, match_attr, None) == field_identifier: + if _match_field(field, field_identifier, section_path, sections): + return field + return None + + +# The order in which field attributes are checked during lookup. +# Label is tried first; if no match, id is tried as a fallback. +FIELD_MATCH_ATTRIBUTES = ("label", "id") + + def _set_values_for_item( client: "Client", parsed_item: ParsedItem, @@ -230,38 +280,49 @@ def _set_values_for_item( {parsed_field.name}" ) - value_found = False - for field in item.fields: - try: - section_id = field.section.id - except AttributeError: - section_id = None - - if field.label == path_parts[1]: - if ( - section_id is None - or (section_id == sections.get(path_parts[0])) - or path_parts[0] in sections.values() - ): - value_found = True - - if config_object: - setattr(config_object, parsed_field.name, field.value) - else: - config_dict[parsed_field.name] = field.value - break - if not value_found: + section_path = path_parts[0] + field_identifier = path_parts[1] + + matched_field = None + for attr in FIELD_MATCH_ATTRIBUTES: + matched_field = _find_field( + item.fields, field_identifier, section_path, sections, attr + ) + if matched_field: + break + + if matched_field: + if config_object: + setattr(config_object, parsed_field.name, matched_field.value) + else: + config_dict[parsed_field.name] = matched_field.value + else: raise UnknownSectionAndFieldTag( - f"There is no section {path_parts[0]} \ - for field {path_parts[1]}" + f"There is no section {section_path} \ + for field {field_identifier}" ) def _convert_sections_to_dict(sections: List[Section]): + """Convert a list of sections into a lookup dict. + + Builds a mapping that supports lookup by both section label and + section id. Label-based keys take priority — an id-based key is + only added when it does not collide with an existing label key. + + Args: + sections (List[Section]): The sections from a 1Password item. + + Returns: + dict: A mapping of section label/id to section id. + """ if not sections: return {} section_dict = {section.label: section.id for section in sections} + for section in sections: + if section.id not in section_dict: + section_dict[section.id] = section.id return section_dict diff --git a/src/tests/test_config.py b/src/tests/test_config.py index 551cbcf..28fc22a 100644 --- a/src/tests/test_config.py +++ b/src/tests/test_config.py @@ -14,6 +14,8 @@ USERNAME_VALUE = "new_user" PASSWORD_VALUE = "password" HOST_VALUE = "http://somehost" +API_KEY_VALUE = "sk-test-abc123" +DB_PORT_VALUE = "5432" class Config: @@ -118,3 +120,127 @@ def test_load_dict(respx_mock): } ] } + +# Item with field ids that differ from their labels, used to test +# the field.id fallback lookup path. +ITEM_NAME3 = "Service Credentials" +ITEM_ID3 = "wepiqdxdzncjtnvmv5fegud4q3" + +item_with_distinct_ids = { + "id": ITEM_ID3, + "title": ITEM_NAME3, + "vault": { + "id": VAULT_ID + }, + "category": "LOGIN", + "sections": [ + { + "id": "Section_A1B2C3", + "label": "api_settings" + } + ], + "fields": [ + { + "id": "Field_X9Y8Z7", + "label": "API Key", + "value": API_KEY_VALUE, + "section": { + "id": "Section_A1B2C3" + } + }, + { + "id": "Field_D4E5F6", + "label": "Database Port", + "value": DB_PORT_VALUE + } + ] +} + + +def test_load_dict_by_field_id(respx_mock): + """load_dict should resolve fields by id when label doesn't match.""" + config_dict = { + "api_key": { + "opitem": ITEM_NAME3, + "opfield": "api_settings.Field_X9Y8Z7", + "opvault": VAULT_ID + }, + "db_port": { + "opitem": ITEM_NAME3, + "opfield": ".Field_D4E5F6", + "opvault": VAULT_ID + } + } + + respx_mock.get( + f"v1/vaults/{VAULT_ID}/items?filter=title eq \"{ITEM_NAME3}\"" + ).mock(return_value=Response(200, json=[item_with_distinct_ids])) + respx_mock.get( + f"v1/vaults/{VAULT_ID}/items/{ITEM_ID3}" + ).mock(return_value=Response(200, json=item_with_distinct_ids)) + + config_values = onepasswordconnectsdk.load_dict(SS_CLIENT, config_dict) + + assert config_values["api_key"] == API_KEY_VALUE + assert config_values["db_port"] == DB_PORT_VALUE + + +def test_load_by_field_id(respx_mock): + """load should resolve fields by id when label doesn't match.""" + + class ConfigById: + api_key: f'opitem:"{ITEM_NAME3}" opfield:api_settings.Field_X9Y8Z7 opvault:{VAULT_ID}' = None + db_port: f'opitem:"{ITEM_NAME3}" opfield:.Field_D4E5F6 opvault:{VAULT_ID}' = None + + respx_mock.get( + f"v1/vaults/{VAULT_ID}/items?filter=title eq \"{ITEM_NAME3}\"" + ).mock(return_value=Response(200, json=[item_with_distinct_ids])) + respx_mock.get( + f"v1/vaults/{VAULT_ID}/items/{ITEM_ID3}" + ).mock(return_value=Response(200, json=item_with_distinct_ids)) + + config_obj = onepasswordconnectsdk.load(SS_CLIENT, ConfigById()) + + assert config_obj.api_key == API_KEY_VALUE + assert config_obj.db_port == DB_PORT_VALUE + + +def test_load_dict_label_takes_priority(respx_mock): + """When both label and id could match, label should win.""" + ambiguous_item = { + "id": "wepiqdxdzncjtnvmv5fegud4q4", + "title": "Ambiguous Item", + "vault": {"id": VAULT_ID}, + "category": "LOGIN", + "fields": [ + { + "id": "shared_ref", + "label": "wrong_label", + "value": "value_from_id_match" + }, + { + "id": "other_id", + "label": "shared_ref", + "value": "value_from_label_match" + } + ] + } + + config_dict = { + "result": { + "opitem": "Ambiguous Item", + "opfield": ".shared_ref", + "opvault": VAULT_ID + } + } + + respx_mock.get( + f"v1/vaults/{VAULT_ID}/items?filter=title eq \"Ambiguous Item\"" + ).mock(return_value=Response(200, json=[ambiguous_item])) + respx_mock.get( + f"v1/vaults/{VAULT_ID}/items/wepiqdxdzncjtnvmv5fegud4q4" + ).mock(return_value=Response(200, json=ambiguous_item)) + + config_values = onepasswordconnectsdk.load_dict(SS_CLIENT, config_dict) + + assert config_values["result"] == "value_from_label_match"