diff --git a/hazelcast/cluster.py b/hazelcast/cluster.py index ce5ec1d6ad..a1e3325893 100644 --- a/hazelcast/cluster.py +++ b/hazelcast/cluster.py @@ -140,7 +140,7 @@ def start(self, connection_manager, membership_listeners): self.add_listener(*listener) def get_member(self, member_uuid): - check_not_none(uuid, "UUID must not be null") + check_not_none(member_uuid, "UUID must not be null") snapshot = self._member_list_snapshot return snapshot.members.get(member_uuid, None) diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index f415611547..36ae965f4f 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -64,7 +64,7 @@ class HazelcastClient: from hazelcast.asyncio import HazelcastClient - client = await HazelcastClient.crate_and_start( + client = await HazelcastClient.create_and_start( cluster_name="a-cluster", ) diff --git a/hazelcast/internal/asyncio_cluster.py b/hazelcast/internal/asyncio_cluster.py index 4ce2491b65..a809a66d93 100644 --- a/hazelcast/internal/asyncio_cluster.py +++ b/hazelcast/internal/asyncio_cluster.py @@ -134,6 +134,9 @@ def __init__(self, client, config): self._listeners = {} self._member_list_snapshot = _EMPTY_SNAPSHOT self._initial_list_fetched = asyncio.Event() + # asyncio tasks are weakly referenced; keep strong refs until they finish. + # see: https://docs.python.org/3/library/asyncio-task.html#creating-tasks + self._close_tasks: typing.Set[asyncio.Task] = set() def start(self, connection_manager, membership_listeners): self._connection_manager = connection_manager @@ -141,7 +144,7 @@ def start(self, connection_manager, membership_listeners): self.add_listener(*listener) def get_member(self, member_uuid): - check_not_none(uuid, "UUID must not be null") + check_not_none(member_uuid, "UUID must not be null") snapshot = self._member_list_snapshot return snapshot.members.get(member_uuid, None) @@ -259,7 +262,7 @@ def _fire_membership_events(self, dead_members, new_members): if handler: try: handler(dead_member) - except: + except Exception: _logger.exception("Exception in membership listener") for new_member in new_members: @@ -267,7 +270,7 @@ def _fire_membership_events(self, dead_members, new_members): if handler: try: handler(new_member) - except: + except Exception: _logger.exception("Exception in membership listener") def _detect_membership_events(self, previous_members, current_members): @@ -282,14 +285,18 @@ def _detect_membership_events(self, previous_members, current_members): for dead_member in dead_members: connection = self._connection_manager.get_connection(dead_member.uuid) if connection: - connection.close_connection( - None, - TargetDisconnectedError( - "The client has closed the connection to this member, " - "after receiving a member left event from the cluster. " - "%s" % connection - ), + task = asyncio.create_task( + connection.close_connection( + None, + TargetDisconnectedError( + "The client has closed the connection to this member, " + "after receiving a member left event from the cluster. " + "%s" % connection + ), + ) ) + self._close_tasks.add(task) + task.add_done_callback(self._close_tasks.discard) if (len(new_members) + len(dead_members)) > 0: if len(current_members) > 0: diff --git a/hazelcast/internal/asyncio_compact.py b/hazelcast/internal/asyncio_compact.py index 1533befdc3..9ba6746961 100644 --- a/hazelcast/internal/asyncio_compact.py +++ b/hazelcast/internal/asyncio_compact.py @@ -102,6 +102,7 @@ async def _replicate_schema( # is not known to be replicated yet. We should retry # sending it in a random member. await asyncio.sleep(self._invocation_retry_pause) + remaining_retries -= 1 # We tried to send it a couple of times, but the member list # in our local and the member list returned by the initiator diff --git a/hazelcast/internal/asyncio_connection.py b/hazelcast/internal/asyncio_connection.py index 5d20192086..762d1add1a 100644 --- a/hazelcast/internal/asyncio_connection.py +++ b/hazelcast/internal/asyncio_connection.py @@ -5,7 +5,7 @@ import struct import time import uuid -from typing import Coroutine +from typing import Coroutine, Tuple from hazelcast import __version__ from hazelcast.config import ReconnectMode @@ -307,11 +307,8 @@ async def connect_to_all_cluster_members(self, sync_start): self._start_connect_all_members_timer() - async def on_connection_close(self, closed_connection): - remote_uuid = closed_connection.remote_uuid - remote_address = closed_connection.remote_address - - if not remote_address: + async def on_connection_close(self, closed_connection, unsafe=False): + if not closed_connection.remote_address: _logger.debug( "Destroying %s, but it has no remote address, hence nothing is " "removed from the connection dictionary", @@ -319,25 +316,7 @@ async def on_connection_close(self, closed_connection): ) return - disconnected = False - removed = False - trigger_reconnection = False - async with self._lock: - connection = self.active_connections.get(remote_uuid, None) - if connection == closed_connection: - self.active_connections.pop(remote_uuid, None) - removed = True - _logger.info( - "Removed connection to %s:%s, connection: %s", - remote_address, - remote_uuid, - connection, - ) - - if not self.active_connections: - trigger_reconnection = True - if self._client_state == ClientState.INITIALIZED_ON_CLUSTER: - disconnected = True + disconnected, removed, trigger_reconnection = await self._determine_connection_state(closed_connection, unsafe=unsafe) if disconnected: self._lifecycle_service.fire_lifecycle_event(LifecycleState.DISCONNECTED) @@ -359,9 +338,39 @@ async def on_connection_close(self, closed_connection): _logger.debug( "Destroying %s, but there is no mapping for %s in the connection dictionary", closed_connection, + closed_connection.remote_uuid, + ) + + async def _determine_connection_state(self, closed_connection, unsafe=False) -> Tuple[bool, bool, bool]: + if unsafe: + return self._determine_connection_state_unsafe(closed_connection) + async with self._lock: + return self._determine_connection_state_unsafe(closed_connection) + + def _determine_connection_state_unsafe(self, closed_connection) -> Tuple[bool, bool, bool]: + remote_uuid = closed_connection.remote_uuid + disconnected = False + removed = False + trigger_reconnection = False + connection = self.active_connections.get(remote_uuid, None) + if connection == closed_connection: + self.active_connections.pop(remote_uuid, None) + removed = True + _logger.info( + "Removed connection to %s:%s, connection: %s", + closed_connection.remote_address, remote_uuid, + connection, ) + if not self.active_connections: + trigger_reconnection = True + if self._client_state == ClientState.INITIALIZED_ON_CLUSTER: + disconnected = True + + return disconnected, removed, trigger_reconnection + + def check_invocation_allowed(self): state = self._client_state if state == ClientState.INITIALIZED_ON_CLUSTER and self.active_connections: @@ -464,6 +473,12 @@ def _init_wait_strategy(self, config): def _start_connect_all_members_timer(self): connecting_uuids = set() + async def connect_to_member(member): + try: + await self._get_or_connect_to_member(member) + except Exception: + _logger.debug("Error connecting to %s in reconnect timer", member, exc_info=True) + async def run(): await asyncio.sleep(1) if not self._lifecycle_service.running: @@ -480,7 +495,7 @@ async def run(): connecting_uuids.add(member_uuid) if not self._lifecycle_service.running: break - tg.create_task(self._get_or_connect_to_member(member)) + tg.create_task(connect_to_member(member)) member_uuids.append(member_uuid) for item in member_uuids: @@ -658,49 +673,52 @@ async def _handle_successful_auth(self, response, connection): existing = self.active_connections.get(remote_uuid, None) - if existing: - await connection.close_connection( - "Duplicate connection to same member with UUID: %s" % remote_uuid, None - ) - return existing - - new_cluster_id = response["cluster_id"] - changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id - if changed_cluster: - await self._check_client_state_on_cluster_change(connection) - _logger.warning( - "Switching from current cluster: %s to new cluster: %s", - self._cluster_id, - new_cluster_id, - ) - self._on_cluster_restart() + if existing: + await connection.close_connection( + "Duplicate connection to same member with UUID: %s" % remote_uuid, None, unsafe=True + ) + return existing + + new_cluster_id = response["cluster_id"] + changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id + if changed_cluster: + await self._check_client_state_on_cluster_change(connection) + _logger.warning( + "Switching from current cluster: %s to new cluster: %s", + self._cluster_id, + new_cluster_id, + ) + self._on_cluster_restart() - async with self._lock: is_initial_connection = not self.active_connections self.active_connections[remote_uuid] = connection fire_connected_lifecycle_event = False - if is_initial_connection: - self._cluster_id = new_cluster_id - # In split brain, the client might connect to the one half - # of the cluster, and then later might reconnect to the - # other half, after the half it was connected to is - # completely dead. Since the cluster id is preserved in - # split brain scenarios, it is impossible to distinguish - # reconnection to the same cluster vs reconnection to the - # other half of the split brain. However, in the latter, - # we might need to send some state to the other half of - # the split brain (like Compact schemas). That forces us - # to send the client state to the cluster after the first - # cluster connection, regardless the cluster id is - # changed or not. - if self._established_initial_cluster_connection: - self._client_state = ClientState.CONNECTED_TO_CLUSTER - await self._initialize_on_cluster(new_cluster_id) - else: - fire_connected_lifecycle_event = True - self._established_initial_cluster_connection = True - self._client_state = ClientState.INITIALIZED_ON_CLUSTER + init_on_cluster = False + if is_initial_connection: + self._cluster_id = new_cluster_id + # In split brain, the client might connect to the one half + # of the cluster, and then later might reconnect to the + # other half, after the half it was connected to is + # completely dead. Since the cluster id is preserved in + # split brain scenarios, it is impossible to distinguish + # reconnection to the same cluster vs reconnection to the + # other half of the split brain. However, in the latter, + # we might need to send some state to the other half of + # the split brain (like Compact schemas). That forces us + # to send the client state to the cluster after the first + # cluster connection, regardless the cluster id is + # changed or not. + if self._established_initial_cluster_connection: + self._client_state = ClientState.CONNECTED_TO_CLUSTER + init_on_cluster = True + else: + fire_connected_lifecycle_event = True + self._established_initial_cluster_connection = True + self._client_state = ClientState.INITIALIZED_ON_CLUSTER + + if init_on_cluster: + await self._initialize_on_cluster(new_cluster_id) if fire_connected_lifecycle_event: self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED) @@ -777,7 +795,7 @@ async def _check_client_state_on_cluster_change(self, connection): # we can operate on. In those scenarios, we rely on the fact that we will # reopen the connections. reason = "Connection does not belong to the cluster %s" % self._cluster_id - await connection.close_connection(reason, None) + await connection.close_connection(reason, None, unsafe=True) raise ValueError(reason) def _on_cluster_restart(self): @@ -985,13 +1003,13 @@ def send_message(self, message): self._write(message.buf) return True - # Not named close to distinguish it from the asyncore.dispatcher.close. - async def close_connection(self, reason, cause): + async def close_connection(self, reason, cause, unsafe=False): """Closes the connection. Args: reason (str): The reason this connection is going to be closed. Is allowed to be None. cause (Exception): The exception responsible for closing this connection. Is allowed to be None. + unsafe (bool): Do not acquire a lock """ if not self.live: return @@ -1003,7 +1021,7 @@ async def close_connection(self, reason, cause): self._inner_close() except Exception: _logger.exception("Error while closing the the connection %s", self) - await self._connection_manager.on_connection_close(self) + await self._connection_manager.on_connection_close(self, unsafe=unsafe) def _log_close(self, reason, cause): msg = "%s closed. Reason: %s" diff --git a/hazelcast/internal/asyncio_discovery.py b/hazelcast/internal/asyncio_discovery.py index dcd890e8fb..9f755cc5cb 100644 --- a/hazelcast/internal/asyncio_discovery.py +++ b/hazelcast/internal/asyncio_discovery.py @@ -53,6 +53,6 @@ async def translate(self, address): async def refresh(self): """Refreshes the internal lookup table if necessary.""" try: - self._private_to_public = self.cloud_discovery.discover_nodes() + self._private_to_public = await asyncio.to_thread(self.cloud_discovery.discover_nodes) except Exception as e: _logger.warning("Failed to load addresses from Hazelcast Cloud: %s", e) diff --git a/hazelcast/internal/asyncio_listener.py b/hazelcast/internal/asyncio_listener.py index 43642761a6..f041df03b1 100644 --- a/hazelcast/internal/asyncio_listener.py +++ b/hazelcast/internal/asyncio_listener.py @@ -82,51 +82,54 @@ async def register_listener( tg.create_task(task) return registration_id except Exception: - await self.deregister_listener(registration_id) + await self._deregister_listener_unsafe(registration_id) raise HazelcastError("Listener cannot be added") async def deregister_listener(self, user_registration_id): check_not_none(user_registration_id, "None user_registration_id is not allowed!") async with self._registration_lock: - listener_registration = self._active_registrations.pop(user_registration_id, None) - if not listener_registration: - return False - - async def handle(inv: Invocation, conn: AsyncioConnection): - try: - await inv.future - except Exception as e: - if not isinstance( - e, (HazelcastClientNotActiveError, IOError, TargetDisconnectedError) - ): - _logger.warning( - "Deregistration of listener with ID %s has failed for address %s: %s", - user_registration_id, - conn.remote_address, - e, - ) + return await self._deregister_listener_unsafe(user_registration_id) - async with asyncio.TaskGroup() as tg: - items = listener_registration.connection_registrations.items() - for connection, event_registration in items: - # Remove local handler - self.remove_event_handler(event_registration.correlation_id) - # The rest is for deleting the remote registration - server_registration_id = event_registration.server_registration_id - deregister_request = listener_registration.encode_deregister_request( - server_registration_id - ) - if deregister_request is None: - # None means no remote registration (e.g. for backup acks) - continue - invocation = Invocation( - deregister_request, connection=connection, timeout=sys.maxsize, urgent=True + async def _deregister_listener_unsafe(self, user_registration_id): + listener_registration = self._active_registrations.pop(user_registration_id, None) + if not listener_registration: + return False + + async def handle(inv: Invocation, conn: AsyncioConnection): + try: + await inv.future + except Exception as e: + if not isinstance( + e, (HazelcastClientNotActiveError, IOError, TargetDisconnectedError) + ): + _logger.warning( + "Deregistration of listener with ID %s has failed for address %s: %s", + user_registration_id, + conn.remote_address, + e, ) - self._invocation_service.invoke(invocation) - tg.create_task(handle(invocation, connection)) - listener_registration.connection_registrations.clear() - return True + async with asyncio.TaskGroup() as tg: + items = listener_registration.connection_registrations.items() + for connection, event_registration in items: + # Remove local handler + self.remove_event_handler(event_registration.correlation_id) + # The rest is for deleting the remote registration + server_registration_id = event_registration.server_registration_id + deregister_request = listener_registration.encode_deregister_request( + server_registration_id + ) + if deregister_request is None: + # None means no remote registration (e.g. for backup acks) + continue + invocation = Invocation( + deregister_request, connection=connection, timeout=sys.maxsize, urgent=True + ) + self._invocation_service.invoke(invocation) + tg.create_task(handle(invocation, connection)) + + listener_registration.connection_registrations.clear() + return True def handle_client_message(self, message: InboundMessage, correlation_id: int): handler = self._event_handlers.get(correlation_id, None) diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index 9daeca0e1f..2f4aa68144 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -1,3 +1,4 @@ +import asyncio import typing from hazelcast.internal.asyncio_proxy.vector_collection import ( @@ -29,11 +30,24 @@ def __init__(self, context): async def get_or_create(self, service_name, name, create_on_remote=True): ns = (service_name, name) - if ns in self._proxies: - return self._proxies[ns] + proxy = self._proxies.get(ns) + if proxy is not None: + if isinstance(proxy, asyncio.Future): + return await proxy + return proxy - proxy = await self._create_proxy(service_name, name, create_on_remote) + # allocate the proxy slot, so a task that tries to access the same proxy knows it's being created + fut = asyncio.get_running_loop().create_future() + self._proxies[ns] = fut + try: + proxy = await self._create_proxy(service_name, name, create_on_remote) + except BaseException as e: + self._proxies.pop(ns, None) + fut.set_exception(e) + raise + # replace the placeholder with the proxy self._proxies[ns] = proxy + fut.set_result(proxy) return proxy async def _create_proxy(self, service_name, name, create_on_remote) -> Proxy: @@ -59,4 +73,6 @@ async def destroy_proxy(self, service_name, name, destroy_on_remote=True): return False def get_distributed_objects(self): - return to_list(self._proxies.values()) + return to_list( + v for v in self._proxies.values() if not isinstance(v, asyncio.Future) + ) diff --git a/hazelcast/internal/asyncio_proxy/map.py b/hazelcast/internal/asyncio_proxy/map.py index a911ef8a38..57d0a755c7 100644 --- a/hazelcast/internal/asyncio_proxy/map.py +++ b/hazelcast/internal/asyncio_proxy/map.py @@ -279,23 +279,32 @@ def handle_event_entry( number_of_affected_entries, ) if event.event_type == EntryEventType.ADDED: - added_func(event) + if added_func: + added_func(event) elif event.event_type == EntryEventType.REMOVED: - removed_func(event) + if removed_func: + removed_func(event) elif event.event_type == EntryEventType.UPDATED: - updated_func(event) + if updated_func: + updated_func(event) elif event.event_type == EntryEventType.EVICTED: - evicted_func(event) + if evicted_func: + evicted_func(event) elif event.event_type == EntryEventType.EVICT_ALL: - evict_all_func(event) + if evict_all_func: + evict_all_func(event) elif event.event_type == EntryEventType.CLEAR_ALL: - clear_all_func(event) + if clear_all_func: + clear_all_func(event) elif event.event_type == EntryEventType.MERGED: - merged_func(event) + if merged_func: + merged_func(event) elif event.event_type == EntryEventType.EXPIRED: - expired_func(event) + if expired_func: + expired_func(event) elif event.event_type == EntryEventType.LOADED: - loaded_func(event) + if loaded_func: + loaded_func(event) return await self._register_listener( request, diff --git a/hazelcast/internal/asyncio_proxy/vector_collection.py b/hazelcast/internal/asyncio_proxy/vector_collection.py index 2a2a1ce632..3e81813f63 100644 --- a/hazelcast/internal/asyncio_proxy/vector_collection.py +++ b/hazelcast/internal/asyncio_proxy/vector_collection.py @@ -390,7 +390,7 @@ def handler(message): key_data = self._to_data(key) value_data = self._to_data(document.value) except SchemaNotReplicatedError as e: - return await self._send_schema_and_retry(e, self.set, key, document) + return await self._send_schema_and_retry(e, self.put, key, document) document = copy.copy(document) document.value = value_data request = vector_collection_put_codec.encode_request( @@ -409,7 +409,8 @@ def handler(message): key_data = self._to_data(key) value_data = self._to_data(document.value) except SchemaNotReplicatedError as e: - return await self._send_schema_and_retry(e, self.set, key, document) + return await self._send_schema_and_retry(e, self.put_if_absent, key, document) + document = copy.copy(document) document.value = value_data request = vector_collection_put_if_absent_codec.encode_request( self.name, diff --git a/hazelcast/internal/asyncio_reactor.py b/hazelcast/internal/asyncio_reactor.py index f08e1bde55..38c2060cf2 100644 --- a/hazelcast/internal/asyncio_reactor.py +++ b/hazelcast/internal/asyncio_reactor.py @@ -69,6 +69,7 @@ def __init__( self._preconn_buffers: list = [] self._create_task: asyncio.Task | None = None self._close_task: asyncio.Task | None = None + self._connect_timer_task: asyncio.Task | None = None self._connected = False self._receive_buffer_size = _BUFFER_SIZE self._sock = None @@ -239,7 +240,7 @@ def _set_socket_options(self, sock, config): sock.setsockopt(level, option_name, value) def _create_ssl_context(self, config: Config): - ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS) protocol = config.ssl_protocol # Use only the configured protocol try: @@ -285,10 +286,11 @@ def __init__(self, conn: AsyncioConnection): self.start_time: float | None = None self._write_buf = io.BytesIO() self._write_buf_size = 0 + self._flush_scheduled = False self._recv_buf = None # asyncio tasks are weakly referenced # storing tasks here in order not to lose them midway - # see: https: // docs.python.org / 3 / library / asyncio - task.html # creating-tasks + # see: https: //docs.python.org/3/library/asyncio-task.html # creating-tasks self._tasks: set = set() def connection_made(self, transport: transports.BaseTransport): @@ -296,7 +298,6 @@ def connection_made(self, transport: transports.BaseTransport): self.start_time = time.time() self.write(self.PROTOCOL_STARTER) _logger.debug("Connected to %s", self._conn._address) - self._conn._loop.call_soon(self._write_loop) def connection_lost(self, exc): _logger.warning("Connection closed by server") @@ -313,6 +314,9 @@ def close(self): def write(self, buf): self._write_buf.write(buf) self._write_buf_size += len(buf) + if not self._flush_scheduled: + self._flush_scheduled = True + self._conn._loop.call_soon(self._flush) def get_buffer(self, sizehint): if self._recv_buf is None: @@ -338,9 +342,9 @@ def _do_write(self): self._write_buf.seek(0) self._write_buf_size = 0 - def _write_loop(self): + def _flush(self): + self._flush_scheduled = False self._do_write() - return self._conn._loop.call_later(0.01, self._write_loop) def _strerror(err): diff --git a/hazelcast/internal/asyncio_statistics.py b/hazelcast/internal/asyncio_statistics.py index 71377481b7..ea4880d460 100644 --- a/hazelcast/internal/asyncio_statistics.py +++ b/hazelcast/internal/asyncio_statistics.py @@ -182,7 +182,7 @@ def _add_system_and_process_metrics(self, attributes, compressor): self._add_system_or_process_metric( attributes, compressor, gauge_name, value, value_type ) - except: + except Exception: _logger.exception("Error while collecting '%s'.", gauge_name) if not self._registered_process_gauges: @@ -197,7 +197,7 @@ def _add_system_and_process_metrics(self, attributes, compressor): self._add_system_or_process_metric( attributes, compressor, gauge_name, value, value_type ) - except: + except Exception: _logger.exception("Error while collecting '%s'.", gauge_name) def _add_system_or_process_metric(self, attributes, compressor, gauge_name, value, value_type): @@ -343,7 +343,7 @@ def _add_near_cache_metric( try: self._add_metric(compressor, descriptor, value, value_type) self._add_attribute(attributes, metric, value, nc_name_with_prefix) - except: + except Exception: _logger.exception( "Error while collecting %s metric for near cache '%s'.", metric, nc_name ) @@ -362,7 +362,7 @@ def _add_tcp_metric( ) try: self._add_metric(compressor, descriptor, value, value_type) - except: + except Exception: _logger.exception("Error while collecting '%s.%s'.", _TCP_METRICS_PREFIX, metric) def _add_metric(self, compressor, descriptor, value, value_type): diff --git a/hazelcast/proxy/multi_map.py b/hazelcast/proxy/multi_map.py index 703c8e196f..86d6dff2bf 100644 --- a/hazelcast/proxy/multi_map.py +++ b/hazelcast/proxy/multi_map.py @@ -117,12 +117,15 @@ def handle_event_entry( uuid, number_of_affected_entries, ) - if event.event_type == EntryEventType.ADDED and added_func: - added_func(event) - elif event.event_type == EntryEventType.REMOVED and removed_func: - removed_func(event) - elif event.event_type == EntryEventType.CLEAR_ALL and clear_all_func: - clear_all_func(event) + if event.event_type == EntryEventType.ADDED: + if added_func: + added_func(event) + elif event.event_type == EntryEventType.REMOVED: + if removed_func: + removed_func(event) + elif event.event_type == EntryEventType.CLEAR_ALL: + if clear_all_func: + clear_all_func(event) return self._register_listener( request, diff --git a/hazelcast/proxy/replicated_map.py b/hazelcast/proxy/replicated_map.py index 70fe58d771..78e527112a 100644 --- a/hazelcast/proxy/replicated_map.py +++ b/hazelcast/proxy/replicated_map.py @@ -171,16 +171,21 @@ def handle_event_entry( uuid, number_of_affected_entries, ) - if event.event_type == EntryEventType.ADDED and added_func: - added_func(event) - elif event.event_type == EntryEventType.REMOVED and removed_func: - removed_func(event) - elif event.event_type == EntryEventType.UPDATED and updated_func: - updated_func(event) - elif event.event_type == EntryEventType.EVICTED and evicted_func: - evicted_func(event) - elif event.event_type == EntryEventType.CLEAR_ALL and clear_all_func: - clear_all_func(event) + if event.event_type == EntryEventType.ADDED: + if added_func: + added_func(event) + elif event.event_type == EntryEventType.REMOVED: + if removed_func: + removed_func(event) + elif event.event_type == EntryEventType.UPDATED: + if updated_func: + updated_func(event) + elif event.event_type == EntryEventType.EVICTED: + if evicted_func: + evicted_func(event) + elif event.event_type == EntryEventType.CLEAR_ALL: + if clear_all_func: + clear_all_func(event) return self._register_listener( request, diff --git a/hazelcast/proxy/vector_collection.py b/hazelcast/proxy/vector_collection.py index 5e45931326..b39eb1f4d8 100644 --- a/hazelcast/proxy/vector_collection.py +++ b/hazelcast/proxy/vector_collection.py @@ -391,7 +391,7 @@ def handler(message): key_data = self._to_data(key) value_data = self._to_data(document.value) except SchemaNotReplicatedError as e: - return self._send_schema_and_retry(e, self.set, key, document) + return self._send_schema_and_retry(e, self.put, key, document) document = copy.copy(document) document.value = value_data request = vector_collection_put_codec.encode_request( @@ -410,7 +410,7 @@ def handler(message): key_data = self._to_data(key) value_data = self._to_data(document.value) except SchemaNotReplicatedError as e: - return self._send_schema_and_retry(e, self.set, key, document) + return self._send_schema_and_retry(e, self.put_if_absent, key, document) document.value = value_data request = vector_collection_put_if_absent_codec.encode_request( self.name, diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 00455063f2..df591d75af 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -543,7 +543,7 @@ def _set_socket_options(self, config): self.socket.setsockopt(level, option_name, value) def _wrap_as_ssl_socket(self, config: Config, hostname: str): - ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS) protocol = config.ssl_protocol diff --git a/tests/integration/asyncio/statistics_test.py b/tests/integration/asyncio/statistics_test.py index 01b22abd0f..f9148d85af 100644 --- a/tests/integration/asyncio/statistics_test.py +++ b/tests/integration/asyncio/statistics_test.py @@ -262,5 +262,5 @@ def get_runtime_and_system_metrics(self, client): try: # Compatibility for <4.2.1 clients return s._get_os_and_runtime_stats() - except: + except Exception: return itertools.chain(s._registered_system_gauges, s._registered_process_gauges)