Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions docs/source/examples/basic_task_label.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Basic task label example"""

import ipyparallel as ipp

# start up ipp cluster with 2 engines
cluster = ipp.Cluster(n=2)
cluster.start_cluster_sync()

rc = cluster.connect_client_sync()
rc.wait_for_engines(n=2)


def wait(t):
import time

tic = time.time()
time.sleep(t)
return time.time() - tic


# use load balanced view
bview = rc.load_balanced_view()
ar_list_b1 = [
bview.set_flags(label=f"mylabel_map_{i:02}").map_async(wait, [2]) for i in range(10)
]
ar_list_b2 = [
bview.set_flags(label=f"mylabel_map_{i:02}").apply_async(wait, 2) for i in range(10)
]
bview.wait(ar_list_b1)
bview.wait(ar_list_b2)


# use direct view
dview = rc[:]
ar_list_d1 = [
dview.set_flags(label=f"mylabel_map_{i + 10:02}").apply_async(wait, 2)
for i in range(10)
]
ar_list_d2 = [
dview.set_flags(label=f"mylabel_map_{i + 10:02}").map_async(wait, [2])
for i in range(10)
]
dview.wait(ar_list_d1)
dview.wait(ar_list_d2)

# query database
data = rc.db_query({'label': {"$nin": ""}}, keys=['msg_id', 'label', 'engine_uuid'])
for d in data:
print(f"msg_id={d['msg_id']}; label={d['label']}; engine_uuid={d['engine_uuid']}")

cluster.stop_cluster_sync()
2 changes: 2 additions & 0 deletions ipyparallel/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def __init__(self, *args, **kwargs):
'stderr': '',
'outputs': [],
'data': {},
'label': None,
}
self.update(md)
self.update(dict(*args, **kwargs))
Expand Down Expand Up @@ -822,6 +823,7 @@ def _extract_metadata(self, msg):
'status': content['status'],
'is_broadcast': msg_meta.get('is_broadcast', False),
'is_coalescing': msg_meta.get('is_coalescing', False),
'label': msg_meta.get('label', None),
}

if md['engine_uuid'] is not None:
Expand Down
71 changes: 63 additions & 8 deletions ipyparallel/client/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,15 @@ class View(HasTraits):
block = Bool(False)
track = Bool(False)
targets = Any()
label = Any()

history = List()
outstanding = Set()
results = Dict()
client = Instance('ipyparallel.Client', allow_none=True)

_socket = Any()
_flag_names = List(['targets', 'block', 'track'])
_flag_names = List(['targets', 'block', 'track', 'label'])
_in_sync_results = Bool(False)
_targets = Any()
_idents = Any()
Expand Down Expand Up @@ -155,6 +156,8 @@ def set_flags(self, **kwargs):
else:
setattr(self, name, value)

return self # returning self would allow direct calling of map/apply in one command (no context manager)

@contextmanager
def temp_flags(self, **kwargs):
"""temporarily set flags, for use in `with` statements.
Expand Down Expand Up @@ -530,7 +533,14 @@ def use_pickle(self):
@sync_results
@save_ids
def _really_apply(
self, f, args=None, kwargs=None, targets=None, block=None, track=None
self,
f,
args=None,
kwargs=None,
targets=None,
block=None,
track=None,
label=None,
):
"""calls f(*args, **kwargs) on remote engines, returning the result.

Expand Down Expand Up @@ -562,6 +572,8 @@ def _really_apply(
block = self.block if block is None else block
track = self.track if track is None else track
targets = self.targets if targets is None else targets
label = self.label if label is None else label
metadata = dict(label=label)

_idents, _targets = self.client._build_targets(targets)
futures = []
Expand All @@ -572,7 +584,13 @@ def _really_apply(

for ident in _idents:
future = self.client.send_apply_request(
self._socket, pf, pargs, pkwargs, track=track, ident=ident
self._socket,
pf,
pargs,
pkwargs,
track=track,
ident=ident,
metadata=metadata,
)
futures.append(future)
if track:
Expand All @@ -592,7 +610,15 @@ def _really_apply(
return ar

@sync_results
def map(self, f, *sequences, block=None, track=False, return_exceptions=False):
def map(
self,
f,
*sequences,
block=None,
track=False,
return_exceptions=False,
label=None,
):
"""Parallel version of builtin `map`, using this View's `targets`.

There will be one task per target, so work will be chunked
Expand Down Expand Up @@ -630,10 +656,17 @@ def map(self, f, *sequences, block=None, track=False, return_exceptions=False):

if block is None:
block = self.block
if label is None:
label = self.label

assert len(sequences) > 0, "must have some sequences to map onto!"
pf = ParallelFunction(
self, f, block=block, track=track, return_exceptions=return_exceptions
self,
f,
block=block,
track=track,
return_exceptions=return_exceptions,
label=label,
)
return pf.map(*sequences)

Expand Down Expand Up @@ -1036,7 +1069,15 @@ def _broadcast_map(f, *sequence_names):
return list(map(f, *sequences))

@_not_coalescing
def map(self, f, *sequences, block=None, track=False, return_exceptions=False):
def map(
self,
f,
*sequences,
block=None,
track=False,
return_exceptions=False,
label=None,
):
"""Parallel version of builtin `map`, using this View's `targets`.

There will be one task per engine, so work will be chunked
Expand Down Expand Up @@ -1176,10 +1217,11 @@ class LoadBalancedView(View):
after = Any()
timeout = CFloat()
retries = Integer(0)
label = Any()

_task_scheme = Any()
_flag_names = List(
['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries']
['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries', 'label']
)
_outstanding_maps = Set()

Expand Down Expand Up @@ -1275,6 +1317,8 @@ def set_flags(self, **kwargs):

self.timeout = t

return self # returning self would allow direct calling of map/apply in one command (no context manager)

@sync_results
@save_ids
def _really_apply(
Expand All @@ -1289,6 +1333,7 @@ def _really_apply(
timeout=None,
targets=None,
retries=None,
label=None,
):
"""calls f(*args, **kwargs) on a remote engine, returning the result.

Expand Down Expand Up @@ -1344,6 +1389,7 @@ def _really_apply(
follow = self.follow if follow is None else follow
timeout = self.timeout if timeout is None else timeout
targets = self.targets if targets is None else targets
label = self.label if label is None else label

if not isinstance(retries, int):
raise TypeError(f'retries must be int, not {type(retries)!r}')
Expand All @@ -1358,7 +1404,12 @@ def _really_apply(
after = self._render_dependency(after)
follow = self._render_dependency(follow)
metadata = dict(
after=after, follow=follow, timeout=timeout, targets=idents, retries=retries
after=after,
follow=follow,
timeout=timeout,
targets=idents,
retries=retries,
label=label,
)

future = self.client.send_apply_request(
Expand Down Expand Up @@ -1389,6 +1440,7 @@ def map(
chunksize=1,
ordered=True,
return_exceptions=False,
label=None,
):
"""Parallel version of builtin `map`, load-balanced by this View.

Expand Down Expand Up @@ -1433,6 +1485,8 @@ def map(
# default
if block is None:
block = self.block
if label is None:
label = self.label

assert len(sequences) > 0, "must have some sequences to map onto!"

Expand All @@ -1443,6 +1497,7 @@ def map(
chunksize=chunksize,
ordered=ordered,
return_exceptions=return_exceptions,
label=label,
)
return pf.map(*sequences)

Expand Down
2 changes: 2 additions & 0 deletions ipyparallel/controller/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def empty_record():
'error': None,
'stdout': '',
'stderr': '',
'label': None,
}


Expand Down Expand Up @@ -111,6 +112,7 @@ def init_record(msg):
'error': None,
'stdout': '',
'stderr': '',
'label': msg['metadata'].get('label', None),
}


Expand Down
14 changes: 13 additions & 1 deletion ipyparallel/controller/mongodb.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""A TaskRecord backend using mongodb"""

try:
from pymongo import MongoClient
from pymongo import MongoClient, version
except ImportError:
from pymongo import Connection as MongoClient

Expand All @@ -15,6 +15,11 @@

from .dictdb import BaseDB

# we need to determine the pymongo version because of API changes. see
# https://pymongo.readthedocs.io/en/stable/migrate-to-pymongo4.html
pymongo_version_major = int(version.split('.')[0])
pymongo_version_minor = int(version.split('.')[1])

# -----------------------------------------------------------------------------
# MongoDB class
# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -56,6 +61,13 @@ def __init__(self, **kwargs):
self.database = self.session
self._db = self._connection[self.database]
self._records = self._db['task_records']
if pymongo_version_major >= 4:
# mimic the old API 3.x
self._records.insert = self._records.insert_one
self._records.update = self._records.update_one
self._records.ensure_index = self._records.create_index
self._records.remove = self._records.delete_many

self._records.ensure_index('msg_id', unique=True)
self._records.ensure_index('submitted') # for sorting history
# for rec in self._records.find
Expand Down
5 changes: 4 additions & 1 deletion ipyparallel/controller/sqlitedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class SQLiteDB(BaseDB):
'error',
'stdout',
'stderr',
'label',
]
)
# sqlite datatypes for checking that db is current format
Expand Down Expand Up @@ -182,6 +183,7 @@ class SQLiteDB(BaseDB):
'error': 'text',
'stdout': 'text',
'stderr': 'text',
'label': 'text',
}
)

Expand Down Expand Up @@ -303,7 +305,8 @@ def _init_db(self):
execute_result text,
error text,
stdout text,
stderr text)
stderr text,
label text)
"""
)
self._db.commit()
Expand Down
1 change: 1 addition & 0 deletions ipyparallel/engine/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def init_metadata(self, parent):
'is_broadcast': parent_metadata.get('is_broadcast', False),
'is_coalescing': parent_metadata.get('is_coalescing', False),
'original_msg_id': parent_metadata.get('original_msg_id', ''),
'label': parent_metadata.get('label', None),
}

def finish_metadata(self, parent, metadata, reply_content):
Expand Down