Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hazelcast/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def start(self, connection_manager, membership_listeners):
self.add_listener(*listener)

def get_member(self, member_uuid):
check_not_none(uuid, "UUID must not be null")
check_not_none(member_uuid, "UUID must not be null")
snapshot = self._member_list_snapshot
return snapshot.members.get(member_uuid, None)

Expand Down
2 changes: 1 addition & 1 deletion hazelcast/internal/asyncio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class HazelcastClient:

from hazelcast.asyncio import HazelcastClient

client = await HazelcastClient.crate_and_start(
client = await HazelcastClient.create_and_start(
cluster_name="a-cluster",
)

Expand Down
27 changes: 17 additions & 10 deletions hazelcast/internal/asyncio_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,17 @@ def __init__(self, client, config):
self._listeners = {}
self._member_list_snapshot = _EMPTY_SNAPSHOT
self._initial_list_fetched = asyncio.Event()
# asyncio tasks are weakly referenced; keep strong refs until they finish.
# see: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
self._close_tasks: typing.Set[asyncio.Task] = set()

def start(self, connection_manager, membership_listeners):
self._connection_manager = connection_manager
for listener in membership_listeners:
self.add_listener(*listener)

def get_member(self, member_uuid):
check_not_none(uuid, "UUID must not be null")
check_not_none(member_uuid, "UUID must not be null")
snapshot = self._member_list_snapshot
return snapshot.members.get(member_uuid, None)

Expand Down Expand Up @@ -259,15 +262,15 @@ def _fire_membership_events(self, dead_members, new_members):
if handler:
try:
handler(dead_member)
except:
except Exception:
_logger.exception("Exception in membership listener")

for new_member in new_members:
for handler, _ in self._listeners.values():
if handler:
try:
handler(new_member)
except:
except Exception:
_logger.exception("Exception in membership listener")

def _detect_membership_events(self, previous_members, current_members):
Expand All @@ -282,14 +285,18 @@ def _detect_membership_events(self, previous_members, current_members):
for dead_member in dead_members:
connection = self._connection_manager.get_connection(dead_member.uuid)
if connection:
connection.close_connection(
None,
TargetDisconnectedError(
"The client has closed the connection to this member, "
"after receiving a member left event from the cluster. "
"%s" % connection
),
task = asyncio.create_task(
connection.close_connection(
None,
TargetDisconnectedError(
"The client has closed the connection to this member, "
"after receiving a member left event from the cluster. "
"%s" % connection
),
)
)
self._close_tasks.add(task)
task.add_done_callback(self._close_tasks.discard)

if (len(new_members) + len(dead_members)) > 0:
if len(current_members) > 0:
Expand Down
1 change: 1 addition & 0 deletions hazelcast/internal/asyncio_compact.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async def _replicate_schema(
# is not known to be replicated yet. We should retry
# sending it in a random member.
await asyncio.sleep(self._invocation_retry_pause)
remaining_retries -= 1

# We tried to send it a couple of times, but the member list
# in our local and the member list returned by the initiator
Expand Down
154 changes: 86 additions & 68 deletions hazelcast/internal/asyncio_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import struct
import time
import uuid
from typing import Coroutine
from typing import Coroutine, Tuple

from hazelcast import __version__
from hazelcast.config import ReconnectMode
Expand Down Expand Up @@ -307,37 +307,16 @@ async def connect_to_all_cluster_members(self, sync_start):

self._start_connect_all_members_timer()

async def on_connection_close(self, closed_connection):
remote_uuid = closed_connection.remote_uuid
remote_address = closed_connection.remote_address

if not remote_address:
async def on_connection_close(self, closed_connection, unsafe=False):
if not closed_connection.remote_address:
_logger.debug(
"Destroying %s, but it has no remote address, hence nothing is "
"removed from the connection dictionary",
closed_connection,
)
return

disconnected = False
removed = False
trigger_reconnection = False
async with self._lock:
connection = self.active_connections.get(remote_uuid, None)
if connection == closed_connection:
self.active_connections.pop(remote_uuid, None)
removed = True
_logger.info(
"Removed connection to %s:%s, connection: %s",
remote_address,
remote_uuid,
connection,
)

if not self.active_connections:
trigger_reconnection = True
if self._client_state == ClientState.INITIALIZED_ON_CLUSTER:
disconnected = True
disconnected, removed, trigger_reconnection = await self._determine_connection_state(closed_connection, unsafe=unsafe)

if disconnected:
self._lifecycle_service.fire_lifecycle_event(LifecycleState.DISCONNECTED)
Expand All @@ -359,9 +338,39 @@ async def on_connection_close(self, closed_connection):
_logger.debug(
"Destroying %s, but there is no mapping for %s in the connection dictionary",
closed_connection,
closed_connection.remote_uuid,
)

async def _determine_connection_state(self, closed_connection, unsafe=False) -> Tuple[bool, bool, bool]:
if unsafe:
return self._determine_connection_state_unsafe(closed_connection)
async with self._lock:
return self._determine_connection_state_unsafe(closed_connection)

def _determine_connection_state_unsafe(self, closed_connection) -> Tuple[bool, bool, bool]:
remote_uuid = closed_connection.remote_uuid
disconnected = False
removed = False
trigger_reconnection = False
connection = self.active_connections.get(remote_uuid, None)
if connection == closed_connection:
self.active_connections.pop(remote_uuid, None)
removed = True
_logger.info(
"Removed connection to %s:%s, connection: %s",
closed_connection.remote_address,
remote_uuid,
connection,
)

if not self.active_connections:
trigger_reconnection = True
if self._client_state == ClientState.INITIALIZED_ON_CLUSTER:
disconnected = True

return disconnected, removed, trigger_reconnection


def check_invocation_allowed(self):
state = self._client_state
if state == ClientState.INITIALIZED_ON_CLUSTER and self.active_connections:
Expand Down Expand Up @@ -464,6 +473,12 @@ def _init_wait_strategy(self, config):
def _start_connect_all_members_timer(self):
connecting_uuids = set()

async def connect_to_member(member):
try:
await self._get_or_connect_to_member(member)
except Exception:
_logger.debug("Error connecting to %s in reconnect timer", member, exc_info=True)

async def run():
await asyncio.sleep(1)
if not self._lifecycle_service.running:
Expand All @@ -480,7 +495,7 @@ async def run():
connecting_uuids.add(member_uuid)
if not self._lifecycle_service.running:
break
tg.create_task(self._get_or_connect_to_member(member))
tg.create_task(connect_to_member(member))
member_uuids.append(member_uuid)

for item in member_uuids:
Expand Down Expand Up @@ -658,49 +673,52 @@ async def _handle_successful_auth(self, response, connection):

existing = self.active_connections.get(remote_uuid, None)

if existing:
await connection.close_connection(
"Duplicate connection to same member with UUID: %s" % remote_uuid, None
)
return existing

new_cluster_id = response["cluster_id"]
changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id
if changed_cluster:
await self._check_client_state_on_cluster_change(connection)
_logger.warning(
"Switching from current cluster: %s to new cluster: %s",
self._cluster_id,
new_cluster_id,
)
self._on_cluster_restart()
if existing:
await connection.close_connection(
"Duplicate connection to same member with UUID: %s" % remote_uuid, None, unsafe=True
)
return existing

new_cluster_id = response["cluster_id"]
changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id
if changed_cluster:
await self._check_client_state_on_cluster_change(connection)
_logger.warning(
"Switching from current cluster: %s to new cluster: %s",
self._cluster_id,
new_cluster_id,
)
self._on_cluster_restart()

async with self._lock:
is_initial_connection = not self.active_connections
self.active_connections[remote_uuid] = connection
fire_connected_lifecycle_event = False

if is_initial_connection:
self._cluster_id = new_cluster_id
# In split brain, the client might connect to the one half
# of the cluster, and then later might reconnect to the
# other half, after the half it was connected to is
# completely dead. Since the cluster id is preserved in
# split brain scenarios, it is impossible to distinguish
# reconnection to the same cluster vs reconnection to the
# other half of the split brain. However, in the latter,
# we might need to send some state to the other half of
# the split brain (like Compact schemas). That forces us
# to send the client state to the cluster after the first
# cluster connection, regardless the cluster id is
# changed or not.
if self._established_initial_cluster_connection:
self._client_state = ClientState.CONNECTED_TO_CLUSTER
await self._initialize_on_cluster(new_cluster_id)
else:
fire_connected_lifecycle_event = True
self._established_initial_cluster_connection = True
self._client_state = ClientState.INITIALIZED_ON_CLUSTER
init_on_cluster = False
if is_initial_connection:
self._cluster_id = new_cluster_id
# In split brain, the client might connect to the one half
# of the cluster, and then later might reconnect to the
# other half, after the half it was connected to is
# completely dead. Since the cluster id is preserved in
# split brain scenarios, it is impossible to distinguish
# reconnection to the same cluster vs reconnection to the
# other half of the split brain. However, in the latter,
# we might need to send some state to the other half of
# the split brain (like Compact schemas). That forces us
# to send the client state to the cluster after the first
# cluster connection, regardless the cluster id is
# changed or not.
if self._established_initial_cluster_connection:
self._client_state = ClientState.CONNECTED_TO_CLUSTER
init_on_cluster = True
else:
fire_connected_lifecycle_event = True
self._established_initial_cluster_connection = True
self._client_state = ClientState.INITIALIZED_ON_CLUSTER

if init_on_cluster:
await self._initialize_on_cluster(new_cluster_id)

if fire_connected_lifecycle_event:
self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED)
Expand Down Expand Up @@ -777,7 +795,7 @@ async def _check_client_state_on_cluster_change(self, connection):
# we can operate on. In those scenarios, we rely on the fact that we will
# reopen the connections.
reason = "Connection does not belong to the cluster %s" % self._cluster_id
await connection.close_connection(reason, None)
await connection.close_connection(reason, None, unsafe=True)
raise ValueError(reason)

def _on_cluster_restart(self):
Expand Down Expand Up @@ -985,13 +1003,13 @@ def send_message(self, message):
self._write(message.buf)
return True

# Not named close to distinguish it from the asyncore.dispatcher.close.
async def close_connection(self, reason, cause):
async def close_connection(self, reason, cause, unsafe=False):
"""Closes the connection.

Args:
reason (str): The reason this connection is going to be closed. Is allowed to be None.
cause (Exception): The exception responsible for closing this connection. Is allowed to be None.
unsafe (bool): Do not acquire a lock
"""
if not self.live:
return
Expand All @@ -1003,7 +1021,7 @@ async def close_connection(self, reason, cause):
self._inner_close()
except Exception:
_logger.exception("Error while closing the the connection %s", self)
await self._connection_manager.on_connection_close(self)
await self._connection_manager.on_connection_close(self, unsafe=unsafe)

def _log_close(self, reason, cause):
msg = "%s closed. Reason: %s"
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/internal/asyncio_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ async def translate(self, address):
async def refresh(self):
"""Refreshes the internal lookup table if necessary."""
try:
self._private_to_public = self.cloud_discovery.discover_nodes()
self._private_to_public = await asyncio.to_thread(self.cloud_discovery.discover_nodes)
except Exception as e:
_logger.warning("Failed to load addresses from Hazelcast Cloud: %s", e)
Loading
Loading