diff --git a/hazelcast/internal/asyncio_cluster.py b/hazelcast/internal/asyncio_cluster.py index b56d136cda..a809a66d93 100644 --- a/hazelcast/internal/asyncio_cluster.py +++ b/hazelcast/internal/asyncio_cluster.py @@ -262,7 +262,7 @@ def _fire_membership_events(self, dead_members, new_members): if handler: try: handler(dead_member) - except: + except Exception: _logger.exception("Exception in membership listener") for new_member in new_members: @@ -270,7 +270,7 @@ def _fire_membership_events(self, dead_members, new_members): if handler: try: handler(new_member) - except: + except Exception: _logger.exception("Exception in membership listener") def _detect_membership_events(self, previous_members, current_members): diff --git a/hazelcast/internal/asyncio_connection.py b/hazelcast/internal/asyncio_connection.py index 5d20192086..a3912d8aad 100644 --- a/hazelcast/internal/asyncio_connection.py +++ b/hazelcast/internal/asyncio_connection.py @@ -5,7 +5,7 @@ import struct import time import uuid -from typing import Coroutine +from typing import Coroutine, Tuple from hazelcast import __version__ from hazelcast.config import ReconnectMode @@ -307,11 +307,8 @@ async def connect_to_all_cluster_members(self, sync_start): self._start_connect_all_members_timer() - async def on_connection_close(self, closed_connection): - remote_uuid = closed_connection.remote_uuid - remote_address = closed_connection.remote_address - - if not remote_address: + async def on_connection_close(self, closed_connection, unsafe=False): + if not closed_connection.remote_address: _logger.debug( "Destroying %s, but it has no remote address, hence nothing is " "removed from the connection dictionary", @@ -319,25 +316,9 @@ async def on_connection_close(self, closed_connection): ) return - disconnected = False - removed = False - trigger_reconnection = False - async with self._lock: - connection = self.active_connections.get(remote_uuid, None) - if connection == closed_connection: - self.active_connections.pop(remote_uuid, None) - removed = True - _logger.info( - "Removed connection to %s:%s, connection: %s", - remote_address, - remote_uuid, - connection, - ) - - if not self.active_connections: - trigger_reconnection = True - if self._client_state == ClientState.INITIALIZED_ON_CLUSTER: - disconnected = True + disconnected, removed, trigger_reconnection = await self._determine_connection_state( + closed_connection, unsafe=unsafe + ) if disconnected: self._lifecycle_service.fire_lifecycle_event(LifecycleState.DISCONNECTED) @@ -359,9 +340,40 @@ async def on_connection_close(self, closed_connection): _logger.debug( "Destroying %s, but there is no mapping for %s in the connection dictionary", closed_connection, + closed_connection.remote_uuid, + ) + + async def _determine_connection_state( + self, closed_connection, unsafe=False + ) -> Tuple[bool, bool, bool]: + if unsafe: + return self._determine_connection_state_unsafe(closed_connection) + async with self._lock: + return self._determine_connection_state_unsafe(closed_connection) + + def _determine_connection_state_unsafe(self, closed_connection) -> Tuple[bool, bool, bool]: + remote_uuid = closed_connection.remote_uuid + disconnected = False + removed = False + trigger_reconnection = False + connection = self.active_connections.get(remote_uuid, None) + if connection == closed_connection: + self.active_connections.pop(remote_uuid, None) + removed = True + _logger.info( + "Removed connection to %s:%s, connection: %s", + closed_connection.remote_address, remote_uuid, + connection, ) + if not self.active_connections: + trigger_reconnection = True + if self._client_state == ClientState.INITIALIZED_ON_CLUSTER: + disconnected = True + + return disconnected, removed, trigger_reconnection + def check_invocation_allowed(self): state = self._client_state if state == ClientState.INITIALIZED_ON_CLUSTER and self.active_connections: @@ -464,6 +476,12 @@ def _init_wait_strategy(self, config): def _start_connect_all_members_timer(self): connecting_uuids = set() + async def connect_to_member(member): + try: + await self._get_or_connect_to_member(member) + except Exception: + _logger.debug("Error connecting to %s in reconnect timer", member, exc_info=True) + async def run(): await asyncio.sleep(1) if not self._lifecycle_service.running: @@ -480,7 +498,7 @@ async def run(): connecting_uuids.add(member_uuid) if not self._lifecycle_service.running: break - tg.create_task(self._get_or_connect_to_member(member)) + tg.create_task(connect_to_member(member)) member_uuids.append(member_uuid) for item in member_uuids: @@ -658,49 +676,54 @@ async def _handle_successful_auth(self, response, connection): existing = self.active_connections.get(remote_uuid, None) - if existing: - await connection.close_connection( - "Duplicate connection to same member with UUID: %s" % remote_uuid, None - ) - return existing - - new_cluster_id = response["cluster_id"] - changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id - if changed_cluster: - await self._check_client_state_on_cluster_change(connection) - _logger.warning( - "Switching from current cluster: %s to new cluster: %s", - self._cluster_id, - new_cluster_id, - ) - self._on_cluster_restart() + if existing: + await connection.close_connection( + "Duplicate connection to same member with UUID: %s" % remote_uuid, + None, + unsafe=True, + ) + return existing + + new_cluster_id = response["cluster_id"] + changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id + if changed_cluster: + await self._check_client_state_on_cluster_change(connection) + _logger.warning( + "Switching from current cluster: %s to new cluster: %s", + self._cluster_id, + new_cluster_id, + ) + self._on_cluster_restart() - async with self._lock: is_initial_connection = not self.active_connections self.active_connections[remote_uuid] = connection fire_connected_lifecycle_event = False - if is_initial_connection: - self._cluster_id = new_cluster_id - # In split brain, the client might connect to the one half - # of the cluster, and then later might reconnect to the - # other half, after the half it was connected to is - # completely dead. Since the cluster id is preserved in - # split brain scenarios, it is impossible to distinguish - # reconnection to the same cluster vs reconnection to the - # other half of the split brain. However, in the latter, - # we might need to send some state to the other half of - # the split brain (like Compact schemas). That forces us - # to send the client state to the cluster after the first - # cluster connection, regardless the cluster id is - # changed or not. - if self._established_initial_cluster_connection: - self._client_state = ClientState.CONNECTED_TO_CLUSTER - await self._initialize_on_cluster(new_cluster_id) - else: - fire_connected_lifecycle_event = True - self._established_initial_cluster_connection = True - self._client_state = ClientState.INITIALIZED_ON_CLUSTER + init_on_cluster = False + if is_initial_connection: + self._cluster_id = new_cluster_id + # In split brain, the client might connect to the one half + # of the cluster, and then later might reconnect to the + # other half, after the half it was connected to is + # completely dead. Since the cluster id is preserved in + # split brain scenarios, it is impossible to distinguish + # reconnection to the same cluster vs reconnection to the + # other half of the split brain. However, in the latter, + # we might need to send some state to the other half of + # the split brain (like Compact schemas). That forces us + # to send the client state to the cluster after the first + # cluster connection, regardless the cluster id is + # changed or not. + if self._established_initial_cluster_connection: + self._client_state = ClientState.CONNECTED_TO_CLUSTER + init_on_cluster = True + else: + fire_connected_lifecycle_event = True + self._established_initial_cluster_connection = True + self._client_state = ClientState.INITIALIZED_ON_CLUSTER + + if init_on_cluster: + await self._initialize_on_cluster(new_cluster_id) if fire_connected_lifecycle_event: self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED) @@ -777,7 +800,7 @@ async def _check_client_state_on_cluster_change(self, connection): # we can operate on. In those scenarios, we rely on the fact that we will # reopen the connections. reason = "Connection does not belong to the cluster %s" % self._cluster_id - await connection.close_connection(reason, None) + await connection.close_connection(reason, None, unsafe=True) raise ValueError(reason) def _on_cluster_restart(self): @@ -985,13 +1008,13 @@ def send_message(self, message): self._write(message.buf) return True - # Not named close to distinguish it from the asyncore.dispatcher.close. - async def close_connection(self, reason, cause): + async def close_connection(self, reason, cause, unsafe=False): """Closes the connection. Args: reason (str): The reason this connection is going to be closed. Is allowed to be None. cause (Exception): The exception responsible for closing this connection. Is allowed to be None. + unsafe (bool): Do not acquire a lock """ if not self.live: return @@ -1003,7 +1026,7 @@ async def close_connection(self, reason, cause): self._inner_close() except Exception: _logger.exception("Error while closing the the connection %s", self) - await self._connection_manager.on_connection_close(self) + await self._connection_manager.on_connection_close(self, unsafe=unsafe) def _log_close(self, reason, cause): msg = "%s closed. Reason: %s" diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index 9daeca0e1f..1c688c037b 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -1,3 +1,4 @@ +import asyncio import typing from hazelcast.internal.asyncio_proxy.vector_collection import ( @@ -29,11 +30,24 @@ def __init__(self, context): async def get_or_create(self, service_name, name, create_on_remote=True): ns = (service_name, name) - if ns in self._proxies: - return self._proxies[ns] + proxy = self._proxies.get(ns) + if proxy is not None: + if isinstance(proxy, asyncio.Future): + return await proxy + return proxy - proxy = await self._create_proxy(service_name, name, create_on_remote) + # allocate the proxy slot, so a task that tries to access the same proxy knows it's being created + fut = asyncio.get_running_loop().create_future() + self._proxies[ns] = fut + try: + proxy = await self._create_proxy(service_name, name, create_on_remote) + except BaseException as e: + self._proxies.pop(ns, None) + fut.set_exception(e) + raise + # replace the placeholder with the proxy self._proxies[ns] = proxy + fut.set_result(proxy) return proxy async def _create_proxy(self, service_name, name, create_on_remote) -> Proxy: @@ -59,4 +73,4 @@ async def destroy_proxy(self, service_name, name, destroy_on_remote=True): return False def get_distributed_objects(self): - return to_list(self._proxies.values()) + return to_list(v for v in self._proxies.values() if not isinstance(v, asyncio.Future)) diff --git a/hazelcast/internal/asyncio_proxy/map.py b/hazelcast/internal/asyncio_proxy/map.py index a911ef8a38..57d0a755c7 100644 --- a/hazelcast/internal/asyncio_proxy/map.py +++ b/hazelcast/internal/asyncio_proxy/map.py @@ -279,23 +279,32 @@ def handle_event_entry( number_of_affected_entries, ) if event.event_type == EntryEventType.ADDED: - added_func(event) + if added_func: + added_func(event) elif event.event_type == EntryEventType.REMOVED: - removed_func(event) + if removed_func: + removed_func(event) elif event.event_type == EntryEventType.UPDATED: - updated_func(event) + if updated_func: + updated_func(event) elif event.event_type == EntryEventType.EVICTED: - evicted_func(event) + if evicted_func: + evicted_func(event) elif event.event_type == EntryEventType.EVICT_ALL: - evict_all_func(event) + if evict_all_func: + evict_all_func(event) elif event.event_type == EntryEventType.CLEAR_ALL: - clear_all_func(event) + if clear_all_func: + clear_all_func(event) elif event.event_type == EntryEventType.MERGED: - merged_func(event) + if merged_func: + merged_func(event) elif event.event_type == EntryEventType.EXPIRED: - expired_func(event) + if expired_func: + expired_func(event) elif event.event_type == EntryEventType.LOADED: - loaded_func(event) + if loaded_func: + loaded_func(event) return await self._register_listener( request, diff --git a/hazelcast/internal/asyncio_proxy/vector_collection.py b/hazelcast/internal/asyncio_proxy/vector_collection.py index d9fd258915..3e81813f63 100644 --- a/hazelcast/internal/asyncio_proxy/vector_collection.py +++ b/hazelcast/internal/asyncio_proxy/vector_collection.py @@ -410,6 +410,7 @@ def handler(message): value_data = self._to_data(document.value) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.put_if_absent, key, document) + document = copy.copy(document) document.value = value_data request = vector_collection_put_if_absent_codec.encode_request( self.name, diff --git a/hazelcast/internal/asyncio_reactor.py b/hazelcast/internal/asyncio_reactor.py index f08e1bde55..38c2060cf2 100644 --- a/hazelcast/internal/asyncio_reactor.py +++ b/hazelcast/internal/asyncio_reactor.py @@ -69,6 +69,7 @@ def __init__( self._preconn_buffers: list = [] self._create_task: asyncio.Task | None = None self._close_task: asyncio.Task | None = None + self._connect_timer_task: asyncio.Task | None = None self._connected = False self._receive_buffer_size = _BUFFER_SIZE self._sock = None @@ -239,7 +240,7 @@ def _set_socket_options(self, sock, config): sock.setsockopt(level, option_name, value) def _create_ssl_context(self, config: Config): - ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS) protocol = config.ssl_protocol # Use only the configured protocol try: @@ -285,10 +286,11 @@ def __init__(self, conn: AsyncioConnection): self.start_time: float | None = None self._write_buf = io.BytesIO() self._write_buf_size = 0 + self._flush_scheduled = False self._recv_buf = None # asyncio tasks are weakly referenced # storing tasks here in order not to lose them midway - # see: https: // docs.python.org / 3 / library / asyncio - task.html # creating-tasks + # see: https: //docs.python.org/3/library/asyncio-task.html # creating-tasks self._tasks: set = set() def connection_made(self, transport: transports.BaseTransport): @@ -296,7 +298,6 @@ def connection_made(self, transport: transports.BaseTransport): self.start_time = time.time() self.write(self.PROTOCOL_STARTER) _logger.debug("Connected to %s", self._conn._address) - self._conn._loop.call_soon(self._write_loop) def connection_lost(self, exc): _logger.warning("Connection closed by server") @@ -313,6 +314,9 @@ def close(self): def write(self, buf): self._write_buf.write(buf) self._write_buf_size += len(buf) + if not self._flush_scheduled: + self._flush_scheduled = True + self._conn._loop.call_soon(self._flush) def get_buffer(self, sizehint): if self._recv_buf is None: @@ -338,9 +342,9 @@ def _do_write(self): self._write_buf.seek(0) self._write_buf_size = 0 - def _write_loop(self): + def _flush(self): + self._flush_scheduled = False self._do_write() - return self._conn._loop.call_later(0.01, self._write_loop) def _strerror(err): diff --git a/hazelcast/proxy/multi_map.py b/hazelcast/proxy/multi_map.py index 703c8e196f..86d6dff2bf 100644 --- a/hazelcast/proxy/multi_map.py +++ b/hazelcast/proxy/multi_map.py @@ -117,12 +117,15 @@ def handle_event_entry( uuid, number_of_affected_entries, ) - if event.event_type == EntryEventType.ADDED and added_func: - added_func(event) - elif event.event_type == EntryEventType.REMOVED and removed_func: - removed_func(event) - elif event.event_type == EntryEventType.CLEAR_ALL and clear_all_func: - clear_all_func(event) + if event.event_type == EntryEventType.ADDED: + if added_func: + added_func(event) + elif event.event_type == EntryEventType.REMOVED: + if removed_func: + removed_func(event) + elif event.event_type == EntryEventType.CLEAR_ALL: + if clear_all_func: + clear_all_func(event) return self._register_listener( request, diff --git a/hazelcast/proxy/replicated_map.py b/hazelcast/proxy/replicated_map.py index 70fe58d771..78e527112a 100644 --- a/hazelcast/proxy/replicated_map.py +++ b/hazelcast/proxy/replicated_map.py @@ -171,16 +171,21 @@ def handle_event_entry( uuid, number_of_affected_entries, ) - if event.event_type == EntryEventType.ADDED and added_func: - added_func(event) - elif event.event_type == EntryEventType.REMOVED and removed_func: - removed_func(event) - elif event.event_type == EntryEventType.UPDATED and updated_func: - updated_func(event) - elif event.event_type == EntryEventType.EVICTED and evicted_func: - evicted_func(event) - elif event.event_type == EntryEventType.CLEAR_ALL and clear_all_func: - clear_all_func(event) + if event.event_type == EntryEventType.ADDED: + if added_func: + added_func(event) + elif event.event_type == EntryEventType.REMOVED: + if removed_func: + removed_func(event) + elif event.event_type == EntryEventType.UPDATED: + if updated_func: + updated_func(event) + elif event.event_type == EntryEventType.EVICTED: + if evicted_func: + evicted_func(event) + elif event.event_type == EntryEventType.CLEAR_ALL: + if clear_all_func: + clear_all_func(event) return self._register_listener( request, diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 00455063f2..df591d75af 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -543,7 +543,7 @@ def _set_socket_options(self, config): self.socket.setsockopt(level, option_name, value) def _wrap_as_ssl_socket(self, config: Config, hostname: str): - ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS) protocol = config.ssl_protocol diff --git a/tests/integration/asyncio/statistics_test.py b/tests/integration/asyncio/statistics_test.py index 01b22abd0f..f9148d85af 100644 --- a/tests/integration/asyncio/statistics_test.py +++ b/tests/integration/asyncio/statistics_test.py @@ -262,5 +262,5 @@ def get_runtime_and_system_metrics(self, client): try: # Compatibility for <4.2.1 clients return s._get_os_and_runtime_stats() - except: + except Exception: return itertools.chain(s._registered_system_gauges, s._registered_process_gauges)