Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 84 additions & 23 deletions src/onepasswordconnectsdk/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,56 @@ def _vault_uuid_for_field(field: str, vault_tag: dict):
)


def _match_field(field, field_identifier, section_path, sections):
"""Check if a field matches the given identifier and section.

Args:
field: The item field to check.
field_identifier (str): The value to match against (label or id).
section_path (str): The section part of the field path.
sections (dict): Mapping of section labels/ids to section ids.

Returns:
bool: True if the field matches the identifier and section.
"""
try:
section_id = field.section.id
except AttributeError:
section_id = None

return (
section_id is None
or (section_id == sections.get(section_path))
or section_path in sections.values()
)


def _find_field(item_fields, field_identifier, section_path, sections, match_attr):
"""Search for a field by a given attribute (label or id).

Args:
item_fields (list): The list of fields on the item.
field_identifier (str): The value to match against.
section_path (str): The section part of the field path.
sections (dict): Mapping of section labels/ids to section ids.
match_attr (str): The field attribute to compare against
(e.g. "label" or "id").

Returns:
Field or None: The matching field, or None if not found.
"""
for field in item_fields:
if getattr(field, match_attr, None) == field_identifier:
if _match_field(field, field_identifier, section_path, sections):
return field
return None


# The order in which field attributes are checked during lookup.
# Label is tried first; if no match, id is tried as a fallback.
FIELD_MATCH_ATTRIBUTES = ("label", "id")


def _set_values_for_item(
client: "Client",
parsed_item: ParsedItem,
Expand All @@ -230,38 +280,49 @@ def _set_values_for_item(
{parsed_field.name}"
)

value_found = False
for field in item.fields:
try:
section_id = field.section.id
except AttributeError:
section_id = None

if field.label == path_parts[1]:
if (
section_id is None
or (section_id == sections.get(path_parts[0]))
or path_parts[0] in sections.values()
):
value_found = True

if config_object:
setattr(config_object, parsed_field.name, field.value)
else:
config_dict[parsed_field.name] = field.value
break
if not value_found:
section_path = path_parts[0]
field_identifier = path_parts[1]

matched_field = None
for attr in FIELD_MATCH_ATTRIBUTES:
matched_field = _find_field(
item.fields, field_identifier, section_path, sections, attr
)
if matched_field:
break

if matched_field:
if config_object:
setattr(config_object, parsed_field.name, matched_field.value)
else:
config_dict[parsed_field.name] = matched_field.value
else:
raise UnknownSectionAndFieldTag(
f"There is no section {path_parts[0]} \
for field {path_parts[1]}"
f"There is no section {section_path} \
for field {field_identifier}"
)


def _convert_sections_to_dict(sections: List[Section]):
"""Convert a list of sections into a lookup dict.

Builds a mapping that supports lookup by both section label and
section id. Label-based keys take priority — an id-based key is
only added when it does not collide with an existing label key.

Args:
sections (List[Section]): The sections from a 1Password item.

Returns:
dict: A mapping of section label/id to section id.
"""
if not sections:
return {}

section_dict = {section.label: section.id for section in sections}
for section in sections:
if section.id not in section_dict:
section_dict[section.id] = section.id
return section_dict


Expand Down
126 changes: 126 additions & 0 deletions src/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
USERNAME_VALUE = "new_user"
PASSWORD_VALUE = "password"
HOST_VALUE = "http://somehost"
API_KEY_VALUE = "sk-test-abc123"
DB_PORT_VALUE = "5432"


class Config:
Expand Down Expand Up @@ -118,3 +120,127 @@ def test_load_dict(respx_mock):
}
]
}

# Item with field ids that differ from their labels, used to test
# the field.id fallback lookup path.
ITEM_NAME3 = "Service Credentials"
ITEM_ID3 = "wepiqdxdzncjtnvmv5fegud4q3"

item_with_distinct_ids = {
"id": ITEM_ID3,
"title": ITEM_NAME3,
"vault": {
"id": VAULT_ID
},
"category": "LOGIN",
"sections": [
{
"id": "Section_A1B2C3",
"label": "api_settings"
}
],
"fields": [
{
"id": "Field_X9Y8Z7",
"label": "API Key",
"value": API_KEY_VALUE,
"section": {
"id": "Section_A1B2C3"
}
},
{
"id": "Field_D4E5F6",
"label": "Database Port",
"value": DB_PORT_VALUE
}
]
}


def test_load_dict_by_field_id(respx_mock):
"""load_dict should resolve fields by id when label doesn't match."""
config_dict = {
"api_key": {
"opitem": ITEM_NAME3,
"opfield": "api_settings.Field_X9Y8Z7",
"opvault": VAULT_ID
},
"db_port": {
"opitem": ITEM_NAME3,
"opfield": ".Field_D4E5F6",
"opvault": VAULT_ID
}
}

respx_mock.get(
f"v1/vaults/{VAULT_ID}/items?filter=title eq \"{ITEM_NAME3}\""
).mock(return_value=Response(200, json=[item_with_distinct_ids]))
respx_mock.get(
f"v1/vaults/{VAULT_ID}/items/{ITEM_ID3}"
).mock(return_value=Response(200, json=item_with_distinct_ids))

config_values = onepasswordconnectsdk.load_dict(SS_CLIENT, config_dict)

assert config_values["api_key"] == API_KEY_VALUE
assert config_values["db_port"] == DB_PORT_VALUE


def test_load_by_field_id(respx_mock):
"""load should resolve fields by id when label doesn't match."""

class ConfigById:
api_key: f'opitem:"{ITEM_NAME3}" opfield:api_settings.Field_X9Y8Z7 opvault:{VAULT_ID}' = None
db_port: f'opitem:"{ITEM_NAME3}" opfield:.Field_D4E5F6 opvault:{VAULT_ID}' = None

respx_mock.get(
f"v1/vaults/{VAULT_ID}/items?filter=title eq \"{ITEM_NAME3}\""
).mock(return_value=Response(200, json=[item_with_distinct_ids]))
respx_mock.get(
f"v1/vaults/{VAULT_ID}/items/{ITEM_ID3}"
).mock(return_value=Response(200, json=item_with_distinct_ids))

config_obj = onepasswordconnectsdk.load(SS_CLIENT, ConfigById())

assert config_obj.api_key == API_KEY_VALUE
assert config_obj.db_port == DB_PORT_VALUE


def test_load_dict_label_takes_priority(respx_mock):
"""When both label and id could match, label should win."""
ambiguous_item = {
"id": "wepiqdxdzncjtnvmv5fegud4q4",
"title": "Ambiguous Item",
"vault": {"id": VAULT_ID},
"category": "LOGIN",
"fields": [
{
"id": "shared_ref",
"label": "wrong_label",
"value": "value_from_id_match"
},
{
"id": "other_id",
"label": "shared_ref",
"value": "value_from_label_match"
}
]
}

config_dict = {
"result": {
"opitem": "Ambiguous Item",
"opfield": ".shared_ref",
"opvault": VAULT_ID
}
}

respx_mock.get(
f"v1/vaults/{VAULT_ID}/items?filter=title eq \"Ambiguous Item\""
).mock(return_value=Response(200, json=[ambiguous_item]))
respx_mock.get(
f"v1/vaults/{VAULT_ID}/items/wepiqdxdzncjtnvmv5fegud4q4"
).mock(return_value=Response(200, json=ambiguous_item))

config_values = onepasswordconnectsdk.load_dict(SS_CLIENT, config_dict)

assert config_values["result"] == "value_from_label_match"