Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hazelcast/internal/asyncio_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,15 @@ def _fire_membership_events(self, dead_members, new_members):
if handler:
try:
handler(dead_member)
except:
except Exception:
_logger.exception("Exception in membership listener")

for new_member in new_members:
for handler, _ in self._listeners.values():
if handler:
try:
handler(new_member)
except:
except Exception:
_logger.exception("Exception in membership listener")

def _detect_membership_events(self, previous_members, current_members):
Expand Down
159 changes: 91 additions & 68 deletions hazelcast/internal/asyncio_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import struct
import time
import uuid
from typing import Coroutine
from typing import Coroutine, Tuple

from hazelcast import __version__
from hazelcast.config import ReconnectMode
Expand Down Expand Up @@ -307,37 +307,18 @@ async def connect_to_all_cluster_members(self, sync_start):

self._start_connect_all_members_timer()

async def on_connection_close(self, closed_connection):
remote_uuid = closed_connection.remote_uuid
remote_address = closed_connection.remote_address

if not remote_address:
async def on_connection_close(self, closed_connection, unsafe=False):
if not closed_connection.remote_address:
_logger.debug(
"Destroying %s, but it has no remote address, hence nothing is "
"removed from the connection dictionary",
closed_connection,
)
return

disconnected = False
removed = False
trigger_reconnection = False
async with self._lock:
connection = self.active_connections.get(remote_uuid, None)
if connection == closed_connection:
self.active_connections.pop(remote_uuid, None)
removed = True
_logger.info(
"Removed connection to %s:%s, connection: %s",
remote_address,
remote_uuid,
connection,
)

if not self.active_connections:
trigger_reconnection = True
if self._client_state == ClientState.INITIALIZED_ON_CLUSTER:
disconnected = True
disconnected, removed, trigger_reconnection = await self._determine_connection_state(
closed_connection, unsafe=unsafe
)

if disconnected:
self._lifecycle_service.fire_lifecycle_event(LifecycleState.DISCONNECTED)
Expand All @@ -359,9 +340,40 @@ async def on_connection_close(self, closed_connection):
_logger.debug(
"Destroying %s, but there is no mapping for %s in the connection dictionary",
closed_connection,
closed_connection.remote_uuid,
)

async def _determine_connection_state(
self, closed_connection, unsafe=False
) -> Tuple[bool, bool, bool]:
if unsafe:
return self._determine_connection_state_unsafe(closed_connection)
async with self._lock:
return self._determine_connection_state_unsafe(closed_connection)

def _determine_connection_state_unsafe(self, closed_connection) -> Tuple[bool, bool, bool]:
remote_uuid = closed_connection.remote_uuid
disconnected = False
removed = False
trigger_reconnection = False
connection = self.active_connections.get(remote_uuid, None)
if connection == closed_connection:
self.active_connections.pop(remote_uuid, None)
removed = True
_logger.info(
"Removed connection to %s:%s, connection: %s",
closed_connection.remote_address,
remote_uuid,
connection,
)

if not self.active_connections:
trigger_reconnection = True
if self._client_state == ClientState.INITIALIZED_ON_CLUSTER:
disconnected = True

return disconnected, removed, trigger_reconnection

def check_invocation_allowed(self):
state = self._client_state
if state == ClientState.INITIALIZED_ON_CLUSTER and self.active_connections:
Expand Down Expand Up @@ -464,6 +476,12 @@ def _init_wait_strategy(self, config):
def _start_connect_all_members_timer(self):
connecting_uuids = set()

async def connect_to_member(member):
try:
await self._get_or_connect_to_member(member)
except Exception:
_logger.debug("Error connecting to %s in reconnect timer", member, exc_info=True)

async def run():
await asyncio.sleep(1)
if not self._lifecycle_service.running:
Expand All @@ -480,7 +498,7 @@ async def run():
connecting_uuids.add(member_uuid)
if not self._lifecycle_service.running:
break
tg.create_task(self._get_or_connect_to_member(member))
tg.create_task(connect_to_member(member))
member_uuids.append(member_uuid)

for item in member_uuids:
Expand Down Expand Up @@ -658,49 +676,54 @@ async def _handle_successful_auth(self, response, connection):

existing = self.active_connections.get(remote_uuid, None)

if existing:
await connection.close_connection(
"Duplicate connection to same member with UUID: %s" % remote_uuid, None
)
return existing

new_cluster_id = response["cluster_id"]
changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id
if changed_cluster:
await self._check_client_state_on_cluster_change(connection)
_logger.warning(
"Switching from current cluster: %s to new cluster: %s",
self._cluster_id,
new_cluster_id,
)
self._on_cluster_restart()
if existing:
await connection.close_connection(
"Duplicate connection to same member with UUID: %s" % remote_uuid,
None,
unsafe=True,
)
return existing

new_cluster_id = response["cluster_id"]
changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id
if changed_cluster:
await self._check_client_state_on_cluster_change(connection)
_logger.warning(
"Switching from current cluster: %s to new cluster: %s",
self._cluster_id,
new_cluster_id,
)
self._on_cluster_restart()

async with self._lock:
is_initial_connection = not self.active_connections
self.active_connections[remote_uuid] = connection
fire_connected_lifecycle_event = False

if is_initial_connection:
self._cluster_id = new_cluster_id
# In split brain, the client might connect to the one half
# of the cluster, and then later might reconnect to the
# other half, after the half it was connected to is
# completely dead. Since the cluster id is preserved in
# split brain scenarios, it is impossible to distinguish
# reconnection to the same cluster vs reconnection to the
# other half of the split brain. However, in the latter,
# we might need to send some state to the other half of
# the split brain (like Compact schemas). That forces us
# to send the client state to the cluster after the first
# cluster connection, regardless the cluster id is
# changed or not.
if self._established_initial_cluster_connection:
self._client_state = ClientState.CONNECTED_TO_CLUSTER
await self._initialize_on_cluster(new_cluster_id)
else:
fire_connected_lifecycle_event = True
self._established_initial_cluster_connection = True
self._client_state = ClientState.INITIALIZED_ON_CLUSTER
init_on_cluster = False
if is_initial_connection:
self._cluster_id = new_cluster_id
# In split brain, the client might connect to the one half
# of the cluster, and then later might reconnect to the
# other half, after the half it was connected to is
# completely dead. Since the cluster id is preserved in
# split brain scenarios, it is impossible to distinguish
# reconnection to the same cluster vs reconnection to the
# other half of the split brain. However, in the latter,
# we might need to send some state to the other half of
# the split brain (like Compact schemas). That forces us
# to send the client state to the cluster after the first
# cluster connection, regardless the cluster id is
# changed or not.
if self._established_initial_cluster_connection:
self._client_state = ClientState.CONNECTED_TO_CLUSTER
init_on_cluster = True
else:
fire_connected_lifecycle_event = True
self._established_initial_cluster_connection = True
self._client_state = ClientState.INITIALIZED_ON_CLUSTER

if init_on_cluster:
await self._initialize_on_cluster(new_cluster_id)

if fire_connected_lifecycle_event:
self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED)
Expand Down Expand Up @@ -777,7 +800,7 @@ async def _check_client_state_on_cluster_change(self, connection):
# we can operate on. In those scenarios, we rely on the fact that we will
# reopen the connections.
reason = "Connection does not belong to the cluster %s" % self._cluster_id
await connection.close_connection(reason, None)
await connection.close_connection(reason, None, unsafe=True)
raise ValueError(reason)

def _on_cluster_restart(self):
Expand Down Expand Up @@ -985,13 +1008,13 @@ def send_message(self, message):
self._write(message.buf)
return True

# Not named close to distinguish it from the asyncore.dispatcher.close.
async def close_connection(self, reason, cause):
async def close_connection(self, reason, cause, unsafe=False):
"""Closes the connection.

Args:
reason (str): The reason this connection is going to be closed. Is allowed to be None.
cause (Exception): The exception responsible for closing this connection. Is allowed to be None.
unsafe (bool): Do not acquire a lock
"""
if not self.live:
return
Expand All @@ -1003,7 +1026,7 @@ async def close_connection(self, reason, cause):
self._inner_close()
except Exception:
_logger.exception("Error while closing the the connection %s", self)
await self._connection_manager.on_connection_close(self)
await self._connection_manager.on_connection_close(self, unsafe=unsafe)

def _log_close(self, reason, cause):
msg = "%s closed. Reason: %s"
Expand Down
22 changes: 18 additions & 4 deletions hazelcast/internal/asyncio_proxy/manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import typing

from hazelcast.internal.asyncio_proxy.vector_collection import (
Expand Down Expand Up @@ -29,11 +30,24 @@ def __init__(self, context):

async def get_or_create(self, service_name, name, create_on_remote=True):
ns = (service_name, name)
if ns in self._proxies:
return self._proxies[ns]
proxy = self._proxies.get(ns)
if proxy is not None:
if isinstance(proxy, asyncio.Future):
return await proxy
return proxy

proxy = await self._create_proxy(service_name, name, create_on_remote)
# allocate the proxy slot, so a task that tries to access the same proxy knows it's being created
fut = asyncio.get_running_loop().create_future()
self._proxies[ns] = fut
try:
proxy = await self._create_proxy(service_name, name, create_on_remote)
except BaseException as e:
self._proxies.pop(ns, None)
fut.set_exception(e)
raise
# replace the placeholder with the proxy
self._proxies[ns] = proxy
fut.set_result(proxy)
return proxy

async def _create_proxy(self, service_name, name, create_on_remote) -> Proxy:
Expand All @@ -59,4 +73,4 @@ async def destroy_proxy(self, service_name, name, destroy_on_remote=True):
return False

def get_distributed_objects(self):
return to_list(self._proxies.values())
return to_list(v for v in self._proxies.values() if not isinstance(v, asyncio.Future))
27 changes: 18 additions & 9 deletions hazelcast/internal/asyncio_proxy/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,23 +279,32 @@ def handle_event_entry(
number_of_affected_entries,
)
if event.event_type == EntryEventType.ADDED:
added_func(event)
if added_func:
added_func(event)
elif event.event_type == EntryEventType.REMOVED:
removed_func(event)
if removed_func:
removed_func(event)
elif event.event_type == EntryEventType.UPDATED:
updated_func(event)
if updated_func:
updated_func(event)
elif event.event_type == EntryEventType.EVICTED:
evicted_func(event)
if evicted_func:
evicted_func(event)
elif event.event_type == EntryEventType.EVICT_ALL:
evict_all_func(event)
if evict_all_func:
evict_all_func(event)
elif event.event_type == EntryEventType.CLEAR_ALL:
clear_all_func(event)
if clear_all_func:
clear_all_func(event)
elif event.event_type == EntryEventType.MERGED:
merged_func(event)
if merged_func:
merged_func(event)
elif event.event_type == EntryEventType.EXPIRED:
expired_func(event)
if expired_func:
expired_func(event)
elif event.event_type == EntryEventType.LOADED:
loaded_func(event)
if loaded_func:
loaded_func(event)

return await self._register_listener(
request,
Expand Down
1 change: 1 addition & 0 deletions hazelcast/internal/asyncio_proxy/vector_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ def handler(message):
value_data = self._to_data(document.value)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.put_if_absent, key, document)
document = copy.copy(document)
document.value = value_data
request = vector_collection_put_if_absent_codec.encode_request(
self.name,
Expand Down
Loading