commit python3-jupyter_ipyparallel for openSUSE:Factory
Hello community, here is the log from the commit of package python3-jupyter_ipyparallel for openSUSE:Factory checked in at 2016-11-13 22:50:55 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python3-jupyter_ipyparallel (Old) and /work/SRC/openSUSE:Factory/.python3-jupyter_ipyparallel.new (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Package is "python3-jupyter_ipyparallel" Changes: -------- --- /work/SRC/openSUSE:Factory/python3-jupyter_ipyparallel/python3-jupyter_ipyparallel-doc.changes 2016-08-05 18:16:17.000000000 +0200 +++ /work/SRC/openSUSE:Factory/.python3-jupyter_ipyparallel.new/python3-jupyter_ipyparallel-doc.changes 2016-11-13 22:50:56.000000000 +0100 @@ -1,0 +2,14 @@ +Sun Oct 2 17:14:20 UTC 2016 - arun@gmx.de + +- update to version 5.2.0: + * Fix compatibility with changes in ipykernel 4.3, 4.4 + * Improve inspection of "@remote" decorated functions + * :meth:`Client.wait` accepts any Future. + * Add "--user" flag to :command:`ipcluster nbextension` + * Default to one core per worker in + :meth:`Client.become_distributed`. Override by specifying + `ncores` keyword-argument. + * Subprocess logs are no longer sent to files by default in + :command:`ipcluster`. + +------------------------------------------------------------------- python3-jupyter_ipyparallel.changes: same change Old: ---- ipyparallel-5.1.1.tar.gz New: ---- ipyparallel-5.2.0.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python3-jupyter_ipyparallel-doc.spec ++++++ --- /var/tmp/diff_new_pack.4a5HWE/_old 2016-11-13 22:50:58.000000000 +0100 +++ /var/tmp/diff_new_pack.4a5HWE/_new 2016-11-13 22:50:58.000000000 +0100 @@ -24,7 +24,7 @@ %endif Name: python3-jupyter_ipyparallel-doc -Version: 5.1.1 +Version: 5.2.0 Release: 0 Summary: Documentation for python-jupyter_ipyparallel License: BSD-3-Clause ++++++ python3-jupyter_ipyparallel.spec ++++++ --- /var/tmp/diff_new_pack.4a5HWE/_old 2016-11-13 22:50:58.000000000 +0100 +++ /var/tmp/diff_new_pack.4a5HWE/_new 2016-11-13 22:50:58.000000000 +0100 @@ -17,7 +17,7 @@ Name: python3-jupyter_ipyparallel -Version: 5.1.1 +Version: 5.2.0 Release: 0 Summary: Interactive Parallel Computing with IPython License: BSD-3-Clause ++++++ ipyparallel-5.1.1.tar.gz -> ipyparallel-5.2.0.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/PKG-INFO new/ipyparallel-5.2.0/PKG-INFO --- old/ipyparallel-5.1.1/PKG-INFO 2016-06-24 16:15:33.000000000 +0200 +++ new/ipyparallel-5.2.0/PKG-INFO 2016-08-17 21:49:40.000000000 +0200 @@ -1,6 +1,6 @@ Metadata-Version: 1.1 Name: ipyparallel -Version: 5.1.1 +Version: 5.2.0 Summary: Interactive Parallel Computing with IPython Home-page: http://ipython.org Author: IPython Development Team diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/README.md new/ipyparallel-5.2.0/README.md --- old/ipyparallel-5.1.1/README.md 2016-06-22 02:20:14.000000000 +0200 +++ new/ipyparallel-5.2.0/README.md 2016-08-14 19:49:09.000000000 +0200 @@ -1,6 +1,12 @@ # Interactive Parallel Computing with IPython -ipyparallel is the new home of IPython.parallel. +ipyparallel is the new home of IPython.parallel. ipyparallel is a Python package and collection of CLI scripts for controlling clusters for Jupyter. + +ipyparallel contains the following CLI scripts: + +* ipcluster - start/stop a cluster +* ipcontroller - start a scheduler +* ipengine - start an engine ## Install @@ -17,10 +23,17 @@ ipcluster nbextension disable - See the [documentation on configuring the notebook server](https://jupyter-notebook.readthedocs.org/en/latest/public_server.html) to find your config or setup your initial `jupyter_notebook_config.py`. +### JupyterHub Install + +To install for all users on JupyterHub, as root: + + jupyter nbextension install --sys-prefix --py ipyparallel + jupyter nbextension enable --sys-prefix --py ipyparallel + jupyter serverextension enable --sys-prefix --py ipyparallel + ## Run Start a cluster: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/docs/source/changelog.rst new/ipyparallel-5.2.0/docs/source/changelog.rst --- old/ipyparallel-5.1.1/docs/source/changelog.rst 2016-06-24 16:15:05.000000000 +0200 +++ new/ipyparallel-5.2.0/docs/source/changelog.rst 2016-08-17 21:36:01.000000000 +0200 @@ -4,6 +4,18 @@ =========================== +5.2 +--- + +- Fix compatibility with changes in ipykernel 4.3, 4.4 +- Improve inspection of ``@remote`` decorated functions +- :meth:`Client.wait` accepts any Future. +- Add ``--user`` flag to :command:`ipcluster nbextension` +- Default to one core per worker in :meth:`Client.become_distributed`. + Override by specifying `ncores` keyword-argument. +- Subprocess logs are no longer sent to files by default in :command:`ipcluster`. + + 5.1 --- @@ -47,13 +59,6 @@ `Slurm https://computing.llnl.gov/tutorials/linux_clusters`_ support is added to ipcluster. -5.1.1 -~~~~~ - -5.1.1 fixes iteration through AsyncResults, which was broken in some instances by 5.0. - -`5.1.1 on GitHub https://github.com/ipython/ipyparallel/milestones/5.1.1`__ - 5.1.0 ~~~~~ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/docs/source/demos.rst new/ipyparallel-5.2.0/docs/source/demos.rst --- old/ipyparallel-5.1.1/docs/source/demos.rst 2016-06-22 02:20:14.000000000 +0200 +++ new/ipyparallel-5.2.0/docs/source/demos.rst 2016-08-14 19:49:09.000000000 +0200 @@ -100,11 +100,12 @@ The overall idea of the calculation is simple: each IPython engine will compute the two digit counts for the digits in a single file. Then in a final step the counts from each engine will be added up. To perform this -calculation, we will need two top-level functions from :file:`pidigits.py`: +calculation, we will need two top-level functions from :file:`pidigits.py`, +:func:`compute_two_digit_freqs` and :func:`reduce_freqs`: .. literalinclude:: ../../examples/pi/pidigits.py :language: python - :lines: 47-62 + :lines: 52-67 We will also use the :func:`plot_two_digit_freqs` function to plot the results. The code to run this calculation in parallel is contained in diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/_version.py new/ipyparallel-5.2.0/ipyparallel/_version.py --- old/ipyparallel-5.1.1/ipyparallel/_version.py 2016-06-24 16:15:05.000000000 +0200 +++ new/ipyparallel-5.2.0/ipyparallel/_version.py 2016-08-17 21:36:34.000000000 +0200 @@ -1,2 +1,2 @@ -version_info = (5, 1, 1) +version_info = (5, 2, 0) __version__ = '.'.join(map(str, version_info)) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/apps/ipclusterapp.py new/ipyparallel-5.2.0/ipyparallel/apps/ipclusterapp.py --- old/ipyparallel-5.1.1/ipyparallel/apps/ipclusterapp.py 2016-06-22 02:20:14.000000000 +0200 +++ new/ipyparallel-5.2.0/ipyparallel/apps/ipclusterapp.py 2016-08-08 14:08:17.000000000 +0200 @@ -290,7 +290,7 @@ early_shutdown = Integer(30, config=True, help="The timeout (in seconds)") _stopping = False - + aliases = Dict(engine_aliases) flags = Dict(engine_flags) @@ -325,7 +325,7 @@ if self.engine_launcher.running: self.log.info("Engines appear to have started successfully") self.early_shutdown = 0 - + def start_engines(self): # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH: n = getattr(self.engine_launcher, 'engine_count', self.n) @@ -343,21 +343,21 @@ if self.early_shutdown and not self._stopping: self.log.error(""" Engines shutdown early, they probably failed to connect. - + Check the engine log files for output. - + If your controller and engines are not on the same machine, you probably have to instruct the controller to listen on an interface other than localhost. - + You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args. - + Be sure to read our security docs before instructing your controller to listen on a public interface. """) self.stop_launchers() - + return self.engines_stopped(r) - + def engines_stopped(self, r): return self.loop.stop() @@ -402,7 +402,7 @@ if self.daemonize: if os.name=='posix': daemonize() - + self.loop.add_callback(self.start_engines) # Now write the new pid file AFTER our new forked pid is active. # self.write_pid_file() @@ -443,7 +443,7 @@ delay = CFloat(1., config=True, help="delay (in s) between starting the controller and the engines") - + controller_ip = Unicode(config=True, help="Set the IP address of the controller.") controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class") def _controller_launcher_changed(self, name, old, new): @@ -459,7 +459,7 @@ Each launcher class has its own set of configuration options, for making sure it will work in your environment. - + Note that using a batch launcher for the controller *does not* put it in the same batch job as the engines, so they will still start separately. @@ -498,11 +498,11 @@ if self.controller_ip: self.controller_launcher.controller_args.append('--ip=%s' % self.controller_ip) self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') - + def engines_stopped(self, r): """prevent parent.engines_stopped from stopping everything on engine shutdown""" pass - + def start_controller(self): self.log.info("Starting Controller with %s", self.controller_launcher_class) self.controller_launcher.on_stop(self.stop_launchers) @@ -550,7 +550,7 @@ if self.daemonize: if os.name=='posix': daemonize() - + def start(): self.start_controller() self.loop.add_timeout(self.loop.time() + self.delay, self.start_engines) @@ -572,24 +572,28 @@ class IPClusterNBExtension(BaseIPythonApplication): """Enable/disable ipcluster tab extension in Jupyter notebook""" - + name = 'ipcluster-nbextension' - + description = """Enable/disable IPython clusters tab in Jupyter notebook - + for Jupyter Notebook >= 4.2, you can use the new nbextension API: - + jupyter serverextension enable --py ipyparallel jupyter nbextension install --py ipyparallel jupyter nbextension enable --py ipyparallel """ - + examples = """ ipcluster nbextension enable ipcluster nbextension disable """ version = __version__ - + user = Bool(False, help="Apply the operation only for the given user").tag( + config=True) + flags = Dict({'user': ({'IPClusterNBExtension': {'user': True}}, + 'Apply the operation only for the given user')}) + def start(self): from ipyparallel.nbextension.install import install_extensions if len(self.extra_args) != 1: @@ -597,10 +601,10 @@ action = self.extra_args[0].lower() if action == 'enable': print("Enabling IPython clusters tab") - install_extensions(enable=True) + install_extensions(enable=True, user=self.user) elif action == 'disable': print("Disabling IPython clusters tab") - install_extensions(enable=False) + install_extensions(enable=False, user=self.user) else: self.exit("Must specify 'enable' or 'disable', not '%s'" % action) @@ -626,7 +630,9 @@ def start(self): if self.subapp is None: - print("No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())) + keys = ', '.join("'{}'".format(key) for + key in self.subcommands.keys()) + print("No subcommand specified. Must specify one of: %s" % keys) print() self.print_description() self.print_subcommands() @@ -638,4 +644,3 @@ if __name__ == '__main__': launch_new_instance() - diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/apps/ipengineapp.py new/ipyparallel-5.2.0/ipyparallel/apps/ipengineapp.py --- old/ipyparallel-5.1.1/ipyparallel/apps/ipengineapp.py 2016-06-22 02:20:14.000000000 +0200 +++ new/ipyparallel-5.2.0/ipyparallel/apps/ipengineapp.py 2016-08-08 14:08:17.000000000 +0200 @@ -254,7 +254,12 @@ app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port) app.log.debug("shell ROUTER Channel on port: %i", app.shell_port) - app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port) + iopub_socket = kernel.iopub_socket + # ipykernel 4.3 iopub_socket is an IOThread wrapper: + if hasattr(iopub_socket, 'socket'): + iopub_socket = iopub_socket.socket + + app.iopub_port = app._bind_socket(iopub_socket, app.iopub_port) app.log.debug("iopub PUB Channel on port: %i", app.iopub_port) kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/apps/launcher.py new/ipyparallel-5.2.0/ipyparallel/apps/launcher.py --- old/ipyparallel-5.1.1/ipyparallel/apps/launcher.py 2016-06-22 02:28:45.000000000 +0200 +++ new/ipyparallel-5.2.0/ipyparallel/apps/launcher.py 2016-07-06 13:00:57.000000000 +0200 @@ -214,14 +214,14 @@ controller_cmd = List(ipcontroller_cmd_argv, config=True, help="""Popen command to launch ipcontroller.""") # Command line arguments to ipcontroller. - controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True, + controller_args = List(['--log-level=%i' % logging.INFO], config=True, help="""command-line args to pass to ipcontroller""") class EngineMixin(ClusterAppMixin): engine_cmd = List(ipengine_cmd_argv, config=True, help="""command to launch the Engine.""") # Command line arguments for ipengine. - engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True, + engine_args = List(['--log-level=%i' % logging.INFO], config=True, help="command-line arguments to pass to ipengine" ) @@ -1195,7 +1195,7 @@ default_template= Unicode("""#!/bin/sh #PBS -V #PBS -N ipcontroller -%s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" +%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv)))) def start(self): @@ -1300,7 +1300,7 @@ default_template= Unicode("""#!/bin/sh #SBATCH --job-name=ipy-controller-{cluster_id} #SBATCH --ntasks=1 -%s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" +%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv)))) def start(self): @@ -1335,7 +1335,7 @@ default_template= Unicode(u"""#$ -V #$ -S /bin/sh #$ -N ipcontroller -%s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" +%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv)))) def start(self): @@ -1400,7 +1400,7 @@ #BSUB -J ipcontroller #BSUB -oo ipcontroller.o.%%J #BSUB -eo ipcontroller.e.%%J - %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" + %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv)))) def start(self): @@ -1481,7 +1481,7 @@ executable = ipcontroller # by default we expect a shared file system transfer_executable = False -arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}' +arguments = '--profile-dir={profile_dir}' --cluster-id='{cluster_id}' """) def start(self): @@ -1498,7 +1498,7 @@ executable = ipengine # by default we expect a shared file system transfer_executable = False -arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'" +arguments = " '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'" """) @@ -1513,7 +1513,7 @@ ipcluster_cmd = List(ipcluster_cmd_argv, config=True, help="Popen command for ipcluster") ipcluster_args = List( - ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True, + ['--clean-logs=True', '--log-level=%i' % logging.INFO], config=True, help="Command line arguments to pass to ipcluster.") ipcluster_subcommand = Unicode('start') profile = Unicode('default') diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/apps/winhpcjob.py new/ipyparallel-5.2.0/ipyparallel/apps/winhpcjob.py --- old/ipyparallel-5.1.1/ipyparallel/apps/winhpcjob.py 2016-06-22 02:28:45.000000000 +0200 +++ new/ipyparallel-5.2.0/ipyparallel/apps/winhpcjob.py 2016-07-06 13:00:57.000000000 +0200 @@ -252,7 +252,7 @@ task_name = Unicode('IPController', config=True) controller_cmd = List(['ipcontroller.exe'], config=True) - controller_args = List(['--log-to-file', '--log-level=40'], config=True) + controller_args = List(['--log-level=40'], config=True) # I don't want these to be configurable std_out_file_path = Unicode('', config=False) std_err_file_path = Unicode('', config=False) @@ -280,7 +280,7 @@ task_name = Unicode('IPEngine', config=True) engine_cmd = List(['ipengine.exe'], config=True) - engine_args = List(['--log-to-file', '--log-level=40'], config=True) + engine_args = List(['--log-level=40'], config=True) # I don't want these to be configurable std_out_file_path = Unicode('', config=False) std_err_file_path = Unicode('', config=False) @@ -304,17 +304,3 @@ return ' '.join(self.engine_cmd + self.engine_args) -# j = WinHPCJob(None) -# j.job_name = 'IPCluster' -# j.username = 'GNET\\bgranger' -# j.requested_nodes = 'GREEN' -# -# t = WinHPCTask(None) -# t.task_name = 'Controller' -# t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10" -# t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default" -# t.std_out_file_path = 'controller-out.txt' -# t.std_err_file_path = 'controller-err.txt' -# t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages" -# j.add_task(t) - diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/client/asyncresult.py new/ipyparallel-5.2.0/ipyparallel/client/asyncresult.py --- old/ipyparallel-5.1.1/ipyparallel/client/asyncresult.py 2016-06-24 14:48:07.000000000 +0200 +++ new/ipyparallel-5.2.0/ipyparallel/client/asyncresult.py 2016-08-08 14:55:03.000000000 +0200 @@ -386,9 +386,9 @@ evt = Event() for child in self._children: self._wait_for_child(child, evt=evt) - results = child.result() - error.collect_exceptions([results], self._fname) - yield results + result = child.result() + error.collect_exceptions([result], self._fname) + yield result else: # already done for r in rlist: @@ -678,15 +678,11 @@ for use in iterator methods """ rlist = child.result() + if not isinstance(rlist, list): + rlist = [rlist] error.collect_exceptions(rlist, self._fname) - try: - for r in rlist: - yield r - except TypeError: - # flattened, not a list - # this could get broken by flattened data that returns iterables - # but most calls to map do not expose the `flatten` argument - yield rlist + for r in rlist: + yield r # asynchronous ordered iterator: def _ordered_iter(self): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/client/client.py new/ipyparallel-5.2.0/ipyparallel/client/client.py --- old/ipyparallel-5.1.1/ipyparallel/client/client.py 2016-06-24 16:15:02.000000000 +0200 +++ new/ipyparallel-5.2.0/ipyparallel/client/client.py 2016-08-14 19:49:09.000000000 +0200 @@ -5,6 +5,8 @@ from __future__ import print_function +import collections +from concurrent.futures import Future import os import json from threading import Thread, Event, current_thread @@ -20,7 +22,6 @@ import zmq from zmq.eventloop.ioloop import IOLoop from zmq.eventloop.zmqstream import ZMQStream -from tornado.concurrent import Future from tornado.gen import multi_future from traitlets.config.configurable import MultipleInstanceError @@ -221,6 +222,11 @@ raise KeyError(key) +def _is_future(f): + """light duck-typing check for Futures""" + return hasattr(f, 'add_done_callback') + + class Client(HasTraits): """A semi-synchronous client to an IPython parallel cluster @@ -1112,10 +1118,15 @@ True : when all msg_ids are done False : timeout reached, some msg_ids still outstanding """ + futures = [] if jobs is None: - theids = self.outstanding + if not self.outstanding: + return True + # make a copy, so that we aren't passing a mutable collection to _futures_for_msgs + theids = set(self.outstanding) else: - if isinstance(jobs, string_types + (int, AsyncResult)): + if isinstance(jobs, string_types + (int, AsyncResult)) \ + or not isinstance(jobs, collections.Iterable): jobs = [jobs] theids = set() for job in jobs: @@ -1125,11 +1136,14 @@ elif isinstance(job, AsyncResult): theids.update(job.msg_ids) continue + elif _is_future(job): + futures.append(job) + continue theids.add(job) - if not theids.intersection(self.outstanding): - return True + if not futures and not theids.intersection(self.outstanding): + return True - futures = self._futures_for_msgs(theids) + futures.extend(self._futures_for_msgs(theids)) return self._await_futures(futures, timeout) def wait_interactive(self, jobs=None, interval=1., timeout=-1.): @@ -1307,9 +1321,11 @@ worker_args['ip'] = distributed_info['ip'] worker_args['port'] = distributed_info['port'] worker_args['nanny'] = nanny + # set default ncores=1, since that's how an IPython cluster is typically set up. + worker_args.setdefault('ncores', 1) + dview.apply_sync(util.become_distributed_worker, **worker_args) # Finally, return an Executor connected to the Scheduler - dview.apply_sync(util.become_distributed_worker, **worker_args) executor = distributed.Executor('{ip}:{port}'.format(**distributed_info)) return executor diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/client/futures.py new/ipyparallel-5.2.0/ipyparallel/client/futures.py --- old/ipyparallel-5.1.1/ipyparallel/client/futures.py 2016-06-22 02:20:14.000000000 +0200 +++ new/ipyparallel-5.2.0/ipyparallel/client/futures.py 2016-08-14 19:49:09.000000000 +0200 @@ -4,7 +4,7 @@ # Distributed under the terms of the Modified BSD License. from threading import Event -from tornado.concurrent import Future +from concurrent.futures import Future class MessageFuture(Future): """Future class to wrap async messages""" diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/client/remotefunction.py new/ipyparallel-5.2.0/ipyparallel/client/remotefunction.py --- old/ipyparallel-5.1.1/ipyparallel/client/remotefunction.py 2016-06-22 02:20:14.000000000 +0200 +++ new/ipyparallel-5.2.0/ipyparallel/client/remotefunction.py 2016-08-14 19:49:09.000000000 +0200 @@ -12,6 +12,7 @@ from . import map as Map from .asyncresult import AsyncMapResult +from IPython.utils.signatures import signature #----------------------------------------------------------------------------- # Functions and Decorators @@ -113,6 +114,22 @@ self.block=block self.flags=flags + # copy function attributes for nicer inspection + # of decorated functions + self.__name__ = getname(f) + if getattr(f, '__doc__', None): + self.__doc__ = '{} wrapping:\n{}'.format( + self.__class__.__name__, f.__doc__, + ) + if getattr(f, '__signature__', None): + self.__signature__ = f.__signature__ + else: + try: + self.__signature__ = signature(f) + except Exception: + # no signature, but that's okay + pass + def __call__(self, *args, **kwargs): block = self.view.block if self.block is None else self.block with self.view.temp_flags(block=block, **self.flags): @@ -155,7 +172,6 @@ chunksize = None ordered = None mapObject = None - _mapping = False def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags): super(ParallelFunction, self).__init__(view, f, block=block, **flags) @@ -166,8 +182,11 @@ self.mapObject = mapClass() @sync_view_results - def __call__(self, *sequences): + def __call__(self, *sequences, **kwargs): client = self.view.client + _mapping = kwargs.pop('__ipp_mapping', False) + if kwargs: + raise TypeError("Unexpected keyword arguments: %s" % kwargs) lens = [] maxlen = minlen = -1 @@ -192,7 +211,7 @@ return [] # check that the length of sequences match - if not self._mapping and minlen != maxlen: + if not _mapping and minlen != maxlen: msg = 'all sequences must have equal length, but have %s' % lens raise ValueError(msg) @@ -226,7 +245,7 @@ if sum([len(arg) for arg in args]) == 0: continue - if self._mapping: + if _mapping: if sys.version_info[0] >= 3: f = lambda f, *sequences: list(map(f, *sequences)) else: @@ -263,12 +282,6 @@ That means it can take generators (will be cast to lists locally), and mismatched sequence lengths will be padded with None. """ - # set _mapping as a flag for use inside self.__call__ - self._mapping = True - try: - ret = self(*sequences) - finally: - self._mapping = False - return ret + return self(*sequences, __ipp_mapping=True) __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction'] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/controller/hub.py new/ipyparallel-5.2.0/ipyparallel/controller/hub.py --- old/ipyparallel-5.1.1/ipyparallel/controller/hub.py 2016-06-22 02:20:14.000000000 +0200 +++ new/ipyparallel-5.2.0/ipyparallel/controller/hub.py 2016-08-08 14:55:03.000000000 +0200 @@ -426,7 +426,7 @@ b'tracktask': self.save_task_destination, b'incontrol': _passer, b'outcontrol': _passer, - b'iopub': self.save_iopub_message, + b'iopub': self.monitor_iopub_message, } self.query_handlers = {'queue_request': self.queue_status, @@ -833,15 +833,27 @@ #--------------------- IOPub Traffic ------------------------------ - def save_iopub_message(self, topics, msg): - """save an iopub message into the db""" - # print (topics) + def monitor_iopub_message(self, topics, msg): + '''intercept iopub traffic so events can be acted upon''' try: msg = self.session.deserialize(msg, content=True) except Exception: self.log.error("iopub::invalid IOPub message", exc_info=True) return + msg_type = msg['header']['msg_type'] + if msg_type == 'shutdown_reply': + session = msg['header']['session'] + eid = self.by_ident.get(session, None) + uuid = self.engines[eid].uuid + self.unregister_engine(ident='shutdown_reply', + msg=dict(content=dict(id=eid, queue=uuid))) + + if msg_type not in ('status', 'shutdown_reply', ): + self.save_iopub_message(topics, msg) + + def save_iopub_message(self, topics, msg): + """save an iopub message into the db""" parent = msg['parent_header'] if not parent: self.log.debug("iopub::IOPub message lacks parent: %r", msg) @@ -862,15 +874,12 @@ name = content['name'] s = '' if rec is None else rec[name] d[name] = s + content['text'] - elif msg_type == 'error': d['error'] = content elif msg_type == 'execute_input': d['execute_input'] = content['code'] elif msg_type in ('display_data', 'execute_result'): d[msg_type] = content - elif msg_type == 'status': - pass elif msg_type == 'data_pub': self.log.info("ignored data_pub message for %s" % msg_id) else: @@ -985,6 +994,11 @@ content=dict(id=eid, uuid=uuid) self.dead_engines.add(uuid) + #stop the heartbeats + self.hearts.pop(uuid, None) + self.heartmonitor.responses.discard(uuid) + self.heartmonitor.hearts.discard(uuid) + self.loop.add_timeout( self.loop.time() + self.registration_timeout, lambda : self._handle_stranded_msgs(eid, uuid), diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/engine/kernel.py new/ipyparallel-5.2.0/ipyparallel/engine/kernel.py --- old/ipyparallel-5.1.1/ipyparallel/engine/kernel.py 2016-06-22 02:20:14.000000000 +0200 +++ new/ipyparallel-5.2.0/ipyparallel/engine/kernel.py 2016-08-10 16:10:22.000000000 +0200 @@ -3,7 +3,7 @@ import sys from datetime import datetime -from ipython_genutils.py3compat import cast_bytes, cast_unicode_py2 +from ipython_genutils.py3compat import cast_bytes, cast_unicode_py2, unicode_type, safe_unicode from traitlets import Integer, Type from ipykernel.ipkernel import IPythonKernel @@ -24,8 +24,7 @@ base = "engine.%s" % self.engine_id return cast_bytes("%s.%s" % (base, topic)) - - + def __init__(self, **kwargs): super(IPythonParallelKernel, self).__init__(**kwargs) # add apply_request, in anticipation of upstream deprecation @@ -35,7 +34,7 @@ self.shell.configurables.append(data_pub) data_pub.session = self.session data_pub.pub_socket = self.iopub_socket - + def init_metadata(self, parent): """init metadata dict, for execute/apply_reply""" return { @@ -165,21 +164,33 @@ item_threshold=self.session.item_threshold, ) - except: + except BaseException as e: # invoke IPython traceback formatting shell.showtraceback() - # FIXME - fish exception info out of shell, possibly left there by - # run_code. We'll need to clean up this logic later. - reply_content = {} - if shell._reply_content is not None: - reply_content.update(shell._reply_content) - # reset after use - shell._reply_content = None + reply_content = { + 'traceback': [], + 'ename': unicode_type(type(e).__name__), + 'evalue': safe_unicode(e), + } + # get formatted traceback, which ipykernel recorded + if hasattr(shell, '_last_traceback'): + # ipykernel 4.4 + reply_content['traceback'] = shell._last_traceback or [] + elif hasattr(shell, '_reply_content'): + # ipykernel <= 4.3 + if shell._reply_content and 'traceback' in shell._reply_content: + reply_content['traceback'] = shell._reply_content['traceback'] + else: + self.log.warning("Didn't find a traceback where I expected to") + shell._last_traceback = None + e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply') + reply_content['engine_info'] = e_info self.send_response(self.iopub_socket, u'error', reply_content, ident=self._topic('error')) self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback'])) result_buf = [] + reply_content['status'] = 'error' else: reply_content = {'status' : 'ok'} diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/nbextension/install.py new/ipyparallel-5.2.0/ipyparallel/nbextension/install.py --- old/ipyparallel-5.1.1/ipyparallel/nbextension/install.py 2016-06-22 02:20:14.000000000 +0200 +++ new/ipyparallel-5.2.0/ipyparallel/nbextension/install.py 2016-08-08 14:08:17.000000000 +0200 @@ -8,23 +8,23 @@ from notebook.services.config import ConfigManager as FrontendConfigManager -def install_extensions(enable=True): +def install_extensions(enable=True, user=False): """Register ipyparallel clusters tab as notebook extensions - + Toggle with enable=True/False. """ from distutils.version import LooseVersion as V import notebook - + if V(notebook.__version__) < V('4.2'): return _install_extension_nb41(enable) - + from notebook.nbextensions import install_nbextension_python, enable_nbextension, disable_nbextension from notebook.serverextensions import toggle_serverextension_python - toggle_serverextension_python('ipyparallel.nbextension') - install_nbextension_python('ipyparallel') + toggle_serverextension_python('ipyparallel.nbextension', user=user) + install_nbextension_python('ipyparallel', user=user) if enable: - enable_nbextension('tree', 'ipyparallel/main') + enable_nbextension('tree', 'ipyparallel/main', user=user) else: disable_nbextension('tree', 'ipyparallel/main') @@ -49,7 +49,7 @@ 'server_extensions': server_extensions, } }) - + # frontend config (*way* easier because it's a dict) frontend = FrontendConfigManager() frontend.update('tree', { diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/tests/test_asyncresult.py new/ipyparallel-5.2.0/ipyparallel/tests/test_asyncresult.py --- old/ipyparallel-5.1.1/ipyparallel/tests/test_asyncresult.py 2016-06-24 14:48:07.000000000 +0200 +++ new/ipyparallel-5.2.0/ipyparallel/tests/test_asyncresult.py 2016-08-10 16:09:22.000000000 +0200 @@ -91,6 +91,16 @@ for r in ar: self.assertEqual(r, 0.125) + def test_iter_error(self): + amr = self.client[:].map_async(lambda x: 1/(x-2), range(5)) + # iterating through failing AMR should raise RemoteError + self.assertRaisesRemote(ZeroDivisionError, list, amr) + # so should get + self.assertRaisesRemote(ZeroDivisionError, amr.get) + amr.wait(10) + # test iteration again after everything is local + self.assertRaisesRemote(ZeroDivisionError, list, amr) + def test_getattr(self): ar = self.client[:].apply_async(wait, 0.5) self.assertEqual(ar.engine_id, [None] * len(ar)) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/tests/test_client.py new/ipyparallel-5.2.0/ipyparallel/tests/test_client.py --- old/ipyparallel-5.1.1/ipyparallel/tests/test_client.py 2016-06-22 02:20:14.000000000 +0200 +++ new/ipyparallel-5.2.0/ipyparallel/tests/test_client.py 2016-08-14 19:49:09.000000000 +0200 @@ -5,10 +5,14 @@ from __future__ import division +from concurrent.futures import Future from datetime import datetime import os +from threading import Thread import time +from tornado.concurrent import Future as TornadoFuture + from IPython import get_ipython from ipyparallel.client import client as clientmod from ipyparallel import error, AsyncHubResult, DirectView, Reference @@ -525,7 +529,21 @@ ar = self.client[-1].apply_async(lambda : 1) self.client.wait_interactive() self.assertEqual(self.client.outstanding, set()) - + + def test_await_future(self): + f = Future() + tf = TornadoFuture() + def finish_later(): + time.sleep(0.1) + f.set_result('future') + tf.set_result('tornado') + Thread(target=finish_later).start() + assert self.client.wait([f, tf]) + assert f.done() + assert tf.done() + assert f.result() == 'future' + assert tf.result() == 'tornado' + @skip_without('distributed') def test_become_distributed(self): executor = self.client.become_distributed() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/tests/test_remotefunction.py new/ipyparallel-5.2.0/ipyparallel/tests/test_remotefunction.py --- old/ipyparallel-5.1.1/ipyparallel/tests/test_remotefunction.py 1970-01-01 01:00:00.000000000 +0100 +++ new/ipyparallel-5.2.0/ipyparallel/tests/test_remotefunction.py 2016-08-14 19:49:09.000000000 +0200 @@ -0,0 +1,60 @@ +"""Tests for remote functions""" + +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. + +from __future__ import division + + +import ipyparallel as ipp + +from .clienttest import ClusterTestCase, add_engines + +def setup(): + add_engines(2, total=True) + +class TestRemoteFunctions(ClusterTestCase): + + def test_remote(self): + v = self.client[-1] + @ipp.remote(v, block=True) + def foo(x, y=5): + """multiply x * y""" + return x * y + self.assertEqual(foo.__name__, 'foo') + self.assertIn('RemoteFunction', foo.__doc__) + self.assertIn('multiply x', foo.__doc__) + + z = foo(5) + self.assertEqual(z, 25) + z = foo(2, 3) + self.assertEqual(z, 6) + z = foo(x=5, y=2) + self.assertEqual(z, 10) + + def test_parallel(self): + n = 2 + v = self.client[:n] + @ipp.parallel(v, block=True) + def foo(x): + """multiply x * y""" + return x * 2 + self.assertEqual(foo.__name__, 'foo') + self.assertIn('ParallelFunction', foo.__doc__) + self.assertIn('multiply x', foo.__doc__) + + z = foo([1, 2, 3, 4]) + self.assertEqual(z, [1, 2, 1, 2, 3, 4, 3, 4]) + + def test_parallel_map(self): + v = self.client.load_balanced_view() + @ipp.parallel(v, block=True) + def foo(x, y=5): + """multiply x * y""" + return x * y + z = foo.map([1, 2, 3]) + self.assertEqual(z, [5, 10, 15]) + z = foo.map([1, 2, 3], [1, 2, 3]) + self.assertEqual(z, [1, 4, 9]) + + \ No newline at end of file diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ipyparallel-5.1.1/setup.py new/ipyparallel-5.2.0/setup.py --- old/ipyparallel-5.1.1/setup.py 2016-06-22 02:20:14.000000000 +0200 +++ new/ipyparallel-5.2.0/setup.py 2016-08-14 19:49:09.000000000 +0200 @@ -123,13 +123,14 @@ extras_require = setuptools_args['extras_require'] = { ':python_version == "2.7"': ['futures'], 'nbext': ["notebook"], + 'test': [ + 'nose', + 'ipython[test]', + 'testpath', + 'mock', + ], } -tests_require = setuptools_args['tests_require'] = [ - 'nose', - 'ipython[test]', - 'mock', -] if 'setuptools' in sys.modules: setup_args.update(setuptools_args)
participants (1)
-
root@hilbert.suse.de