Hello community, here is the log from the commit of package buildstream for openSUSE:Factory checked in at 2018-10-08 17:44:44 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/buildstream (Old) and /work/SRC/openSUSE:Factory/.buildstream.new (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Package is "buildstream" Mon Oct 8 17:44:44 2018 rev:8 rq:640109 version:1.2.3 Changes: -------- --- /work/SRC/openSUSE:Factory/buildstream/buildstream.changes 2018-10-01 08:17:53.089949931 +0200 +++ /work/SRC/openSUSE:Factory/.buildstream.new/buildstream.changes 2018-10-08 17:46:55.678368921 +0200 @@ -1,0 +2,18 @@ +Fri Oct 5 13:20:12 UTC 2018 - bjorn.lie@gmail.com + +- Update to version 1.2.3: + + Fixed an unhandled exception when cleaning up a build sandbox. + + Fixed race condition when calculating cache size and commiting + artifacts. + + Fixed regression where terminating with `^C` results in a + double user interrogation. + + Fixed regression in summary when builds are terminated. + + Fixed regression where irrelevant status messages appear from + git sources. + + Improve performance of artifact uploads by batching file + transfers. + + Fixed performance of artifact downloads by batching file + transfers. + + Fixed checks for paths which escape the project directory. + +------------------------------------------------------------------- Old: ---- BuildStream-1.2.2.tar.xz New: ---- BuildStream-1.2.3.tar.xz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ buildstream.spec ++++++ --- /var/tmp/diff_new_pack.eRIa0G/_old 2018-10-08 17:46:56.974367269 +0200 +++ /var/tmp/diff_new_pack.eRIa0G/_new 2018-10-08 17:46:56.978367265 +0200 @@ -18,7 +18,7 @@ %define real_name BuildStream Name: buildstream -Version: 1.2.2 +Version: 1.2.3 Release: 0 Summary: A framework for modelling build pipelines in YAML License: LGPL-2.1-or-later ++++++ BuildStream-1.2.2.tar.xz -> BuildStream-1.2.3.tar.xz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/.gitlab-ci.yml new/BuildStream-1.2.3/.gitlab-ci.yml --- old/BuildStream-1.2.2/.gitlab-ci.yml 2018-09-24 16:19:50.000000000 +0200 +++ new/BuildStream-1.2.3/.gitlab-ci.yml 2018-10-04 16:40:53.000000000 +0200 @@ -84,6 +84,8 @@ - cd ../.. - mkdir -p coverage-linux/ - cp dist/buildstream/.coverage.* coverage-linux/coverage."${CI_JOB_NAME}" + except: + - schedules artifacts: paths: - coverage-linux/ @@ -132,6 +134,8 @@ - cd ../.. - mkdir -p coverage-unix/ - cp dist/buildstream/.coverage.* coverage-unix/coverage.unix + except: + - schedules artifacts: paths: - coverage-unix/ @@ -153,10 +157,41 @@ - make BST_FORCE_SESSION_REBUILD=1 -C doc - cd ../.. - mv dist/buildstream/doc/build/html public + except: + - schedules artifacts: paths: - public/ +.overnight-tests: &overnight-tests-template + stage: test + variables: + bst_ext_url: git+https://gitlab.com/BuildStream/bst-external.git + bst_ext_ref: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1 + fd_sdk_ref: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.12 + before_script: + - (cd dist && ./unpack.sh && cd buildstream && pip3 install .) + - pip3 install --user -e ${bst_ext_url}@${bst_ext_ref}#egg=bst_ext + - git clone https://gitlab.com/freedesktop-sdk/freedesktop-sdk.git + - git -C freedesktop-sdk checkout ${fd_sdk_ref} + only: + - schedules + +overnight-tests: + <<: *overnight-tests-template + script: + - make -C freedesktop-sdk + tags: + - overnight-tests + +overnight-tests-no-cache: + <<: *overnight-tests-template + script: + - sed -i '/artifacts:/,+1 d' freedesktop-sdk/bootstrap/project.conf + - sed -i '/artifacts:/,+1 d' freedesktop-sdk/project.conf + - make -C freedesktop-sdk + tags: + - overnight-tests # Check code quality with gitlab's built-in feature. # @@ -175,6 +210,8 @@ --volume "$PWD":/code --volume /var/run/docker.sock:/var/run/docker.sock "registry.gitlab.com/gitlab-org/security-products/codequality:$SP_VERSION" /code + except: + - schedules artifacts: paths: [gl-code-quality-report.json] @@ -204,6 +241,8 @@ radon raw -s -j buildstream > analysis/raw.json radon raw -s buildstream + except: + - schedules artifacts: paths: - analysis/ @@ -229,6 +268,8 @@ - tests-fedora-28 - tests-unix - source_dist + except: + - schedules # Deploy, only for merges which land on master branch. # @@ -253,3 +294,5 @@ # See https://gitlab.com/gitlab-org/gitlab-ce/issues/35141 # - master + except: + - schedules diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/BuildStream.egg-info/PKG-INFO new/BuildStream-1.2.3/BuildStream.egg-info/PKG-INFO --- old/BuildStream-1.2.2/BuildStream.egg-info/PKG-INFO 2018-09-24 16:35:50.000000000 +0200 +++ new/BuildStream-1.2.3/BuildStream.egg-info/PKG-INFO 2018-10-05 09:09:02.000000000 +0200 @@ -1,14 +1,14 @@ Metadata-Version: 2.1 Name: BuildStream -Version: 1.2.2 +Version: 1.2.3 Summary: A framework for modelling build pipelines in YAML Home-page: https://gitlab.com/BuildStream/buildstream Author: BuildStream Developers Author-email: buildstream-list@gnome.org License: LGPL Project-URL: Documentation, https://buildstream.gitlab.io/buildstream/ -Project-URL: Tracker, https://gitlab.com/BuildStream/buildstream/issues Project-URL: Mailing List, https://mail.gnome.org/mailman/listinfo/buildstream-list +Project-URL: Tracker, https://gitlab.com/BuildStream/buildstream/issues Description: About ----- .. image:: https://gitlab.com/BuildStream/buildstream/badges/master/pipeline.svg diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/NEWS new/BuildStream-1.2.3/NEWS --- old/BuildStream-1.2.2/NEWS 2018-09-24 16:28:19.000000000 +0200 +++ new/BuildStream-1.2.3/NEWS 2018-10-05 09:01:35.000000000 +0200 @@ -1,4 +1,24 @@ ================= +buildstream 1.2.3 +================= + + o Fixed an unhandled exception when cleaning up a build sandbox (#153) + + o Fixed race condition when calculating cache size and commiting artifacts + + o Fixed regression where terminating with `^C` results in a double user interrogation (#693) + + o Fixed regression in summary when builds are terminated (#479) + + o Fixed regression where irrelevant status messages appear from git sources + + o Improve performance of artifact uploads by batching file transfers (#676/#677) + + o Fixed performance of artifact downloads by batching file transfers (#554) + + o Fixed checks for paths which escape the project directory (#673) + +================= buildstream 1.2.2 ================= diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/PKG-INFO new/BuildStream-1.2.3/PKG-INFO --- old/BuildStream-1.2.2/PKG-INFO 2018-09-24 16:35:50.000000000 +0200 +++ new/BuildStream-1.2.3/PKG-INFO 2018-10-05 09:09:03.000000000 +0200 @@ -1,14 +1,14 @@ Metadata-Version: 2.1 Name: BuildStream -Version: 1.2.2 +Version: 1.2.3 Summary: A framework for modelling build pipelines in YAML Home-page: https://gitlab.com/BuildStream/buildstream Author: BuildStream Developers Author-email: buildstream-list@gnome.org License: LGPL Project-URL: Documentation, https://buildstream.gitlab.io/buildstream/ -Project-URL: Tracker, https://gitlab.com/BuildStream/buildstream/issues Project-URL: Mailing List, https://mail.gnome.org/mailman/listinfo/buildstream-list +Project-URL: Tracker, https://gitlab.com/BuildStream/buildstream/issues Description: About ----- .. image:: https://gitlab.com/BuildStream/buildstream/badges/master/pipeline.svg diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/buildstream/_artifactcache/artifactcache.py new/BuildStream-1.2.3/buildstream/_artifactcache/artifactcache.py --- old/BuildStream-1.2.2/buildstream/_artifactcache/artifactcache.py 2018-09-24 16:19:50.000000000 +0200 +++ new/BuildStream-1.2.3/buildstream/_artifactcache/artifactcache.py 2018-10-04 16:40:53.000000000 +0200 @@ -277,7 +277,7 @@ "Please increase the cache-quota in {}." .format(self.context.config_origin or default_conf)) - if self.get_quota_exceeded(): + if self.has_quota_exceeded(): raise ArtifactError("Cache too full. Aborting.", detail=detail, reason="cache-too-full") @@ -364,14 +364,14 @@ self._cache_size = cache_size self._write_cache_size(self._cache_size) - # get_quota_exceeded() + # has_quota_exceeded() # # Checks if the current artifact cache size exceeds the quota. # # Returns: # (bool): True of the quota is exceeded # - def get_quota_exceeded(self): + def has_quota_exceeded(self): return self.get_cache_size() > self._cache_quota ################################################ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/buildstream/_artifactcache/cascache.py new/BuildStream-1.2.3/buildstream/_artifactcache/cascache.py --- old/BuildStream-1.2.2/buildstream/_artifactcache/cascache.py 2018-09-24 16:19:50.000000000 +0200 +++ new/BuildStream-1.2.3/buildstream/_artifactcache/cascache.py 2018-10-04 16:40:53.000000000 +0200 @@ -43,6 +43,11 @@ from . import ArtifactCache +# The default limit for gRPC messages is 4 MiB. +# Limit payload to 1 MiB to leave sufficient headroom for metadata. +_MAX_PAYLOAD_BYTES = 1024 * 1024 + + # A CASCache manages artifacts in a CAS repository as specified in the # Remote Execution API. # @@ -76,6 +81,7 @@ ################################################ # Implementation of abstract methods # ################################################ + def contains(self, element, key): refpath = self._refpath(self.get_artifact_fullname(element, key)) @@ -115,7 +121,7 @@ def commit(self, element, content, keys): refs = [self.get_artifact_fullname(element, key) for key in keys] - tree = self._create_tree(content) + tree = self._commit_directory(content) for ref in refs: self.set_ref(ref, tree) @@ -151,6 +157,7 @@ q = multiprocessing.Queue() for remote_spec in remote_specs: # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q)) try: @@ -263,109 +270,69 @@ self.set_ref(newref, tree) + def _push_refs_to_remote(self, refs, remote): + skipped_remote = True + try: + for ref in refs: + tree = self.resolve_ref(ref) + + # Check whether ref is already on the server in which case + # there is no need to push the artifact + try: + request = buildstream_pb2.GetReferenceRequest() + request.key = ref + response = remote.ref_storage.GetReference(request) + + if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: + # ref is already on the server with the same tree + continue + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + # Intentionally re-raise RpcError for outer except block. + raise + + self._send_directory(remote, tree) + + request = buildstream_pb2.UpdateReferenceRequest() + request.keys.append(ref) + request.digest.hash = tree.hash + request.digest.size_bytes = tree.size_bytes + remote.ref_storage.UpdateReference(request) + + skipped_remote = False + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: + raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e + + return not skipped_remote + def push(self, element, keys): - refs = [self.get_artifact_fullname(element, key) for key in keys] + + refs = [self.get_artifact_fullname(element, key) for key in list(keys)] project = element._get_project() push_remotes = [r for r in self._remotes[project] if r.spec.push] pushed = False - display_key = element._get_brief_display_key() + for remote in push_remotes: remote.init() - skipped_remote = True + display_key = element._get_brief_display_key() element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) - try: - for ref in refs: - tree = self.resolve_ref(ref) - - # Check whether ref is already on the server in which case - # there is no need to push the artifact - try: - request = buildstream_pb2.GetReferenceRequest() - request.key = ref - response = remote.ref_storage.GetReference(request) - - if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: - # ref is already on the server with the same tree - continue - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - # Intentionally re-raise RpcError for outer except block. - raise - - missing_blobs = {} - required_blobs = self._required_blobs(tree) - - # Limit size of FindMissingBlobs request - for required_blobs_group in _grouper(required_blobs, 512): - request = remote_execution_pb2.FindMissingBlobsRequest() - - for required_digest in required_blobs_group: - d = request.blob_digests.add() - d.hash = required_digest.hash - d.size_bytes = required_digest.size_bytes - - response = remote.cas.FindMissingBlobs(request) - for digest in response.missing_blob_digests: - d = remote_execution_pb2.Digest() - d.hash = digest.hash - d.size_bytes = digest.size_bytes - missing_blobs[d.hash] = d - - # Upload any blobs missing on the server - skipped_remote = False - for digest in missing_blobs.values(): - uuid_ = uuid.uuid4() - resource_name = '/'.join(['uploads', str(uuid_), 'blobs', - digest.hash, str(digest.size_bytes)]) - - def request_stream(resname): - with open(self.objpath(digest), 'rb') as f: - assert os.fstat(f.fileno()).st_size == digest.size_bytes - offset = 0 - finished = False - remaining = digest.size_bytes - while not finished: - chunk_size = min(remaining, 64 * 1024) - remaining -= chunk_size - - request = bytestream_pb2.WriteRequest() - request.write_offset = offset - # max. 64 kB chunks - request.data = f.read(chunk_size) - request.resource_name = resname - request.finish_write = remaining <= 0 - yield request - offset += chunk_size - finished = request.finish_write - response = remote.bytestream.Write(request_stream(resource_name)) - - request = buildstream_pb2.UpdateReferenceRequest() - request.keys.append(ref) - request.digest.hash = tree.hash - request.digest.size_bytes = tree.size_bytes - remote.ref_storage.UpdateReference(request) - - pushed = True - - if not skipped_remote: - element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: - raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e - - if skipped_remote: + if self._push_refs_to_remote(refs, remote): + element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) + pushed = True + else: self.context.message(Message( None, MessageType.INFO, "Remote ({}) already has {} cached".format( remote.spec.url, element._get_brief_display_key()) )) + return pushed ################################################ @@ -451,7 +418,7 @@ def set_ref(self, ref, tree): refpath = self._refpath(ref) os.makedirs(os.path.dirname(refpath), exist_ok=True) - with utils.save_file_atomic(refpath, 'wb') as f: + with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f: f.write(tree.SerializeToString()) # resolve_ref(): @@ -594,6 +561,7 @@ ################################################ # Local Private Methods # ################################################ + def _checkout(self, dest, tree): os.makedirs(dest, exist_ok=True) @@ -623,7 +591,21 @@ def _refpath(self, ref): return os.path.join(self.casdir, 'refs', 'heads', ref) - def _create_tree(self, path, *, digest=None): + # _commit_directory(): + # + # Adds local directory to content addressable store. + # + # Adds files, symbolic links and recursively other directories in + # a local directory to the content addressable store. + # + # Args: + # path (str): Path to the directory to add. + # dir_digest (Digest): An optional Digest object to use. + # + # Returns: + # (Digest): Digest object for the directory added. + # + def _commit_directory(self, path, *, dir_digest=None): directory = remote_execution_pb2.Directory() for name in sorted(os.listdir(path)): @@ -632,7 +614,7 @@ if stat.S_ISDIR(mode): dirnode = directory.directories.add() dirnode.name = name - self._create_tree(full_path, digest=dirnode.digest) + self._commit_directory(full_path, dir_digest=dirnode.digest) elif stat.S_ISREG(mode): filenode = directory.files.add() filenode.name = name @@ -645,7 +627,8 @@ else: raise ArtifactError("Unsupported file type for {}".format(full_path)) - return self.add_object(digest=digest, buffer=directory.SerializeToString()) + return self.add_object(digest=dir_digest, + buffer=directory.SerializeToString()) def _get_subdir(self, tree, subdir): head, name = os.path.split(subdir) @@ -756,16 +739,16 @@ # q.put(str(e)) - def _required_blobs(self, tree): + def _required_blobs(self, directory_digest): # parse directory, and recursively add blobs d = remote_execution_pb2.Digest() - d.hash = tree.hash - d.size_bytes = tree.size_bytes + d.hash = directory_digest.hash + d.size_bytes = directory_digest.size_bytes yield d directory = remote_execution_pb2.Directory() - with open(self.objpath(tree), 'rb') as f: + with open(self.objpath(directory_digest), 'rb') as f: directory.ParseFromString(f.read()) for filenode in directory.files: @@ -777,50 +760,203 @@ for dirnode in directory.directories: yield from self._required_blobs(dirnode.digest) - def _fetch_blob(self, remote, digest, out): + def _fetch_blob(self, remote, digest, stream): resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)]) request = bytestream_pb2.ReadRequest() request.resource_name = resource_name request.read_offset = 0 for response in remote.bytestream.Read(request): - out.write(response.data) + stream.write(response.data) + stream.flush() - out.flush() - assert digest.size_bytes == os.fstat(out.fileno()).st_size + assert digest.size_bytes == os.fstat(stream.fileno()).st_size - def _fetch_directory(self, remote, tree): - objpath = self.objpath(tree) + # _ensure_blob(): + # + # Fetch and add blob if it's not already local. + # + # Args: + # remote (Remote): The remote to use. + # digest (Digest): Digest object for the blob to fetch. + # + # Returns: + # (str): The path of the object + # + def _ensure_blob(self, remote, digest): + objpath = self.objpath(digest) if os.path.exists(objpath): - # already in local cache - return + # already in local repository + return objpath - with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out: - self._fetch_blob(remote, tree, out) + with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f: + self._fetch_blob(remote, digest, f) - directory = remote_execution_pb2.Directory() + added_digest = self.add_object(path=f.name) + assert added_digest.hash == digest.hash - with open(out.name, 'rb') as f: - directory.ParseFromString(f.read()) + return objpath + + def _batch_download_complete(self, batch): + for digest, data in batch.send(): + with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f: + f.write(data) + f.flush() + + added_digest = self.add_object(path=f.name) + assert added_digest.hash == digest.hash + + # Helper function for _fetch_directory(). + def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue): + self._batch_download_complete(batch) + + # All previously scheduled directories are now locally available, + # move them to the processing queue. + fetch_queue.extend(fetch_next_queue) + fetch_next_queue.clear() + return _CASBatchRead(remote) + + # Helper function for _fetch_directory(). + def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False): + in_local_cache = os.path.exists(self.objpath(digest)) - for filenode in directory.files: - fileobjpath = self.objpath(tree) - if os.path.exists(fileobjpath): - # already in local cache - continue + if in_local_cache: + # Skip download, already in local cache. + pass + elif (digest.size_bytes >= remote.max_batch_total_size_bytes or + not remote.batch_read_supported): + # Too large for batch request, download in independent request. + self._ensure_blob(remote, digest) + in_local_cache = True + else: + if not batch.add(digest): + # Not enough space left in batch request. + # Complete pending batch first. + batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) + batch.add(digest) + + if recursive: + if in_local_cache: + # Add directory to processing queue. + fetch_queue.append(digest) + else: + # Directory will be available after completing pending batch. + # Add directory to deferred processing queue. + fetch_next_queue.append(digest) + + return batch + + # _fetch_directory(): + # + # Fetches remote directory and adds it to content addressable store. + # + # Fetches files, symbolic links and recursively other directories in + # the remote directory and adds them to the content addressable + # store. + # + # Args: + # remote (Remote): The remote to use. + # dir_digest (Digest): Digest object for the directory to fetch. + # + def _fetch_directory(self, remote, dir_digest): + fetch_queue = [dir_digest] + fetch_next_queue = [] + batch = _CASBatchRead(remote) + + while len(fetch_queue) + len(fetch_next_queue) > 0: + if len(fetch_queue) == 0: + batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) - with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f: - self._fetch_blob(remote, filenode.digest, f) + dir_digest = fetch_queue.pop(0) - digest = self.add_object(path=f.name) - assert digest.hash == filenode.digest.hash + objpath = self._ensure_blob(remote, dir_digest) + + directory = remote_execution_pb2.Directory() + with open(objpath, 'rb') as f: + directory.ParseFromString(f.read()) for dirnode in directory.directories: - self._fetch_directory(remote, dirnode.digest) + batch = self._fetch_directory_node(remote, dirnode.digest, batch, + fetch_queue, fetch_next_queue, recursive=True) - # place directory blob only in final location when we've downloaded - # all referenced blobs to avoid dangling references in the repository - digest = self.add_object(path=out.name) - assert digest.hash == tree.hash + for filenode in directory.files: + batch = self._fetch_directory_node(remote, filenode.digest, batch, + fetch_queue, fetch_next_queue) + + # Fetch final batch + self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) + + def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()): + resource_name = '/'.join(['uploads', str(u_uid), 'blobs', + digest.hash, str(digest.size_bytes)]) + + def request_stream(resname, instream): + offset = 0 + finished = False + remaining = digest.size_bytes + while not finished: + chunk_size = min(remaining, _MAX_PAYLOAD_BYTES) + remaining -= chunk_size + + request = bytestream_pb2.WriteRequest() + request.write_offset = offset + # max. _MAX_PAYLOAD_BYTES chunks + request.data = instream.read(chunk_size) + request.resource_name = resname + request.finish_write = remaining <= 0 + + yield request + + offset += chunk_size + finished = request.finish_write + + response = remote.bytestream.Write(request_stream(resource_name, stream)) + + assert response.committed_size == digest.size_bytes + + def _send_directory(self, remote, digest, u_uid=uuid.uuid4()): + required_blobs = self._required_blobs(digest) + + missing_blobs = dict() + # Limit size of FindMissingBlobs request + for required_blobs_group in _grouper(required_blobs, 512): + request = remote_execution_pb2.FindMissingBlobsRequest() + + for required_digest in required_blobs_group: + d = request.blob_digests.add() + d.hash = required_digest.hash + d.size_bytes = required_digest.size_bytes + + response = remote.cas.FindMissingBlobs(request) + for missing_digest in response.missing_blob_digests: + d = remote_execution_pb2.Digest() + d.hash = missing_digest.hash + d.size_bytes = missing_digest.size_bytes + missing_blobs[d.hash] = d + + # Upload any blobs missing on the server + self._send_blobs(remote, missing_blobs.values(), u_uid) + + def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()): + batch = _CASBatchUpdate(remote) + + for digest in digests: + with open(self.objpath(digest), 'rb') as f: + assert os.fstat(f.fileno()).st_size == digest.size_bytes + + if (digest.size_bytes >= remote.max_batch_total_size_bytes or + not remote.batch_update_supported): + # Too large for batch request, upload in independent request. + self._send_blob(remote, digest, f, u_uid=u_uid) + else: + if not batch.add(digest, f): + # Not enough space left in batch request. + # Complete pending batch first. + batch.send() + batch = _CASBatchUpdate(remote) + batch.add(digest, f) + + # Send final batch + batch.send() # Represents a single remote CAS cache. @@ -870,11 +1006,129 @@ self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel) self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) + self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel) self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel) + self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES + try: + request = remote_execution_pb2.GetCapabilitiesRequest() + response = self.capabilities.GetCapabilities(request) + server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes + if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes: + self.max_batch_total_size_bytes = server_max_batch_total_size_bytes + except grpc.RpcError as e: + # Simply use the defaults for servers that don't implement GetCapabilities() + if e.code() != grpc.StatusCode.UNIMPLEMENTED: + raise + + # Check whether the server supports BatchReadBlobs() + self.batch_read_supported = False + try: + request = remote_execution_pb2.BatchReadBlobsRequest() + response = self.cas.BatchReadBlobs(request) + self.batch_read_supported = True + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.UNIMPLEMENTED: + raise + + # Check whether the server supports BatchUpdateBlobs() + self.batch_update_supported = False + try: + request = remote_execution_pb2.BatchUpdateBlobsRequest() + response = self.cas.BatchUpdateBlobs(request) + self.batch_update_supported = True + except grpc.RpcError as e: + if (e.code() != grpc.StatusCode.UNIMPLEMENTED and + e.code() != grpc.StatusCode.PERMISSION_DENIED): + raise + self._initialized = True +# Represents a batch of blobs queued for fetching. +# +class _CASBatchRead(): + def __init__(self, remote): + self._remote = remote + self._max_total_size_bytes = remote.max_batch_total_size_bytes + self._request = remote_execution_pb2.BatchReadBlobsRequest() + self._size = 0 + self._sent = False + + def add(self, digest): + assert not self._sent + + new_batch_size = self._size + digest.size_bytes + if new_batch_size > self._max_total_size_bytes: + # Not enough space left in current batch + return False + + request_digest = self._request.digests.add() + request_digest.hash = digest.hash + request_digest.size_bytes = digest.size_bytes + self._size = new_batch_size + return True + + def send(self): + assert not self._sent + self._sent = True + + if len(self._request.digests) == 0: + return + + batch_response = self._remote.cas.BatchReadBlobs(self._request) + + for response in batch_response.responses: + if response.status.code != grpc.StatusCode.OK.value[0]: + raise ArtifactError("Failed to download blob {}: {}".format( + response.digest.hash, response.status.code)) + if response.digest.size_bytes != len(response.data): + raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format( + response.digest.hash, response.digest.size_bytes, len(response.data))) + + yield (response.digest, response.data) + + +# Represents a batch of blobs queued for upload. +# +class _CASBatchUpdate(): + def __init__(self, remote): + self._remote = remote + self._max_total_size_bytes = remote.max_batch_total_size_bytes + self._request = remote_execution_pb2.BatchUpdateBlobsRequest() + self._size = 0 + self._sent = False + + def add(self, digest, stream): + assert not self._sent + + new_batch_size = self._size + digest.size_bytes + if new_batch_size > self._max_total_size_bytes: + # Not enough space left in current batch + return False + + blob_request = self._request.requests.add() + blob_request.digest.hash = digest.hash + blob_request.digest.size_bytes = digest.size_bytes + blob_request.data = stream.read(digest.size_bytes) + self._size = new_batch_size + return True + + def send(self): + assert not self._sent + self._sent = True + + if len(self._request.requests) == 0: + return + + batch_response = self._remote.cas.BatchUpdateBlobs(self._request) + + for response in batch_response.responses: + if response.status.code != grpc.StatusCode.OK.value[0]: + raise ArtifactError("Failed to upload blob {}: {}".format( + response.digest.hash, response.status.code)) + + def _grouper(iterable, n): while True: try: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/buildstream/_artifactcache/casserver.py new/BuildStream-1.2.3/buildstream/_artifactcache/casserver.py --- old/BuildStream-1.2.2/buildstream/_artifactcache/casserver.py 2018-09-24 16:19:52.000000000 +0200 +++ new/BuildStream-1.2.3/buildstream/_artifactcache/casserver.py 2018-10-04 16:40:53.000000000 +0200 @@ -38,8 +38,9 @@ from .cascache import CASCache -# The default limit for gRPC messages is 4 MiB -_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024 +# The default limit for gRPC messages is 4 MiB. +# Limit payload to 1 MiB to leave sufficient headroom for metadata. +_MAX_PAYLOAD_BYTES = 1024 * 1024 # Trying to push an artifact that is too large @@ -69,7 +70,7 @@ _ByteStreamServicer(artifactcache, enable_push=enable_push), server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(artifactcache), server) + _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) @@ -158,7 +159,7 @@ remaining = client_digest.size_bytes - request.read_offset while remaining > 0: - chunk_size = min(remaining, 64 * 1024) + chunk_size = min(remaining, _MAX_PAYLOAD_BYTES) remaining -= chunk_size response = bytestream_pb2.ReadResponse() @@ -223,9 +224,10 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer): - def __init__(self, cas): + def __init__(self, cas, *, enable_push): super().__init__() self.cas = cas + self.enable_push = enable_push def FindMissingBlobs(self, request, context): response = remote_execution_pb2.FindMissingBlobsResponse() @@ -242,7 +244,7 @@ for digest in request.digests: batch_size += digest.size_bytes - if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES: + if batch_size > _MAX_PAYLOAD_BYTES: context.set_code(grpc.StatusCode.INVALID_ARGUMENT) return response @@ -261,6 +263,46 @@ return response + def BatchUpdateBlobs(self, request, context): + response = remote_execution_pb2.BatchUpdateBlobsResponse() + + if not self.enable_push: + context.set_code(grpc.StatusCode.PERMISSION_DENIED) + return response + + batch_size = 0 + + for blob_request in request.requests: + digest = blob_request.digest + + batch_size += digest.size_bytes + if batch_size > _MAX_PAYLOAD_BYTES: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return response + + blob_response = response.responses.add() + blob_response.digest.hash = digest.hash + blob_response.digest.size_bytes = digest.size_bytes + + if len(blob_request.data) != digest.size_bytes: + blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION + continue + + try: + _clean_up_cache(self.cas, digest.size_bytes) + + with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: + out.write(blob_request.data) + out.flush() + server_digest = self.cas.add_object(path=out.name) + if server_digest.hash != digest.hash: + blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION + + except ArtifactTooLargeException: + blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED + + return response + class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): def GetCapabilities(self, request, context): @@ -269,7 +311,7 @@ cache_capabilities = response.cache_capabilities cache_capabilities.digest_function.append(remote_execution_pb2.SHA256) cache_capabilities.action_cache_update_capabilities.update_enabled = False - cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES + cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED response.deprecated_api_version.major = 2 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/buildstream/_scheduler/jobs/job.py new/BuildStream-1.2.3/buildstream/_scheduler/jobs/job.py --- old/BuildStream-1.2.2/buildstream/_scheduler/jobs/job.py 2018-09-24 16:19:50.000000000 +0200 +++ new/BuildStream-1.2.3/buildstream/_scheduler/jobs/job.py 2018-10-04 16:40:53.000000000 +0200 @@ -119,6 +119,8 @@ self._result = None # Return value of child action in the parent self._tries = 0 # Try count, for retryable jobs self._skipped_flag = False # Indicate whether the job was skipped. + self._terminated = False # Whether this job has been explicitly terminated + # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries. # self._retry_flag = True @@ -188,6 +190,8 @@ # Terminate the process using multiprocessing API pathway self._process.terminate() + self._terminated = True + # terminate_wait() # # Wait for terminated jobs to complete @@ -271,18 +275,22 @@ # running the integration commands). # # Args: - # (int): The plugin identifier for this task + # task_id (int): The plugin identifier for this task # def set_task_id(self, task_id): self._task_id = task_id # skipped # + # This will evaluate to True if the job was skipped + # during processing, or if it was forcefully terminated. + # # Returns: - # bool: True if the job was skipped while processing. + # (bool): Whether the job should appear as skipped + # @property def skipped(self): - return self._skipped_flag + return self._skipped_flag or self._terminated ####################################################### # Abstract Methods # diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/buildstream/_scheduler/queues/buildqueue.py new/BuildStream-1.2.3/buildstream/_scheduler/queues/buildqueue.py --- old/BuildStream-1.2.2/buildstream/_scheduler/queues/buildqueue.py 2018-09-24 16:19:50.000000000 +0200 +++ new/BuildStream-1.2.3/buildstream/_scheduler/queues/buildqueue.py 2018-10-04 16:40:53.000000000 +0200 @@ -65,7 +65,7 @@ # If the estimated size outgrows the quota, ask the scheduler # to queue a job to actually check the real cache size. # - if artifacts.get_quota_exceeded(): + if artifacts.has_quota_exceeded(): self._scheduler.check_cache_size() def done(self, job, element, result, success): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/buildstream/_scheduler/queues/queue.py new/BuildStream-1.2.3/buildstream/_scheduler/queues/queue.py --- old/BuildStream-1.2.2/buildstream/_scheduler/queues/queue.py 2018-09-24 16:19:50.000000000 +0200 +++ new/BuildStream-1.2.3/buildstream/_scheduler/queues/queue.py 2018-10-04 16:40:53.000000000 +0200 @@ -325,15 +325,22 @@ detail=traceback.format_exc()) self.failed_elements.append(element) else: - - # No exception occured, handle the success/failure state in the normal way # + # No exception occured in post processing + # + + # Only place in the output done queue if the job + # was considered successful if success: self._done_queue.append(job) - if not job.skipped: - self.processed_elements.append(element) - else: - self.skipped_elements.append(element) + + # A Job can be skipped whether or not it has failed, + # we want to only bookkeep them as processed or failed + # if they are not skipped. + if job.skipped: + self.skipped_elements.append(element) + elif success: + self.processed_elements.append(element) else: self.failed_elements.append(element) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/buildstream/_scheduler/scheduler.py new/BuildStream-1.2.3/buildstream/_scheduler/scheduler.py --- old/BuildStream-1.2.2/buildstream/_scheduler/scheduler.py 2018-09-24 16:19:50.000000000 +0200 +++ new/BuildStream-1.2.3/buildstream/_scheduler/scheduler.py 2018-10-04 16:40:53.000000000 +0200 @@ -349,7 +349,7 @@ platform = Platform.get_platform() artifacts = platform.artifactcache - if not artifacts.get_quota_exceeded(): + if not artifacts.has_quota_exceeded(): return job = CleanupJob(self, 'cleanup', 'cleanup/cleanup', @@ -387,6 +387,15 @@ # A loop registered event callback for keyboard interrupts # def _interrupt_event(self): + + # FIXME: This should not be needed, but for some reason we receive an + # additional SIGINT event when the user hits ^C a second time + # to inform us that they really intend to terminate; even though + # we have disconnected our handlers at this time. + # + if self.terminated: + return + # Leave this to the frontend to decide, if no # interrrupt callback was specified, then just terminate. if self._interrupt_callback: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/buildstream/_version.py new/BuildStream-1.2.3/buildstream/_version.py --- old/BuildStream-1.2.2/buildstream/_version.py 2018-09-24 16:35:50.000000000 +0200 +++ new/BuildStream-1.2.3/buildstream/_version.py 2018-10-05 09:09:03.000000000 +0200 @@ -8,11 +8,11 @@ version_json = ''' { - "date": "2018-09-24T15:28:37+0100", + "date": "2018-10-05T16:01:57+0900", "dirty": false, "error": null, - "full-revisionid": "d423cdc761a808960ea21302f89b5383fa9558b4", - "version": "1.2.2" + "full-revisionid": "10abe77fe8d77385d86f225b503d9185f4ef7f3a", + "version": "1.2.3" } ''' # END VERSION_JSON diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/buildstream/_yaml.py new/BuildStream-1.2.3/buildstream/_yaml.py --- old/BuildStream-1.2.2/buildstream/_yaml.py 2018-09-23 16:27:35.000000000 +0200 +++ new/BuildStream-1.2.3/buildstream/_yaml.py 2018-10-04 14:13:11.000000000 +0200 @@ -467,7 +467,7 @@ "{}: Specified path '{}' does not exist" .format(provenance, path_str)) - is_inside = project_dir_path in full_resolved_path.parents or ( + is_inside = project_dir_path.resolve() in full_resolved_path.parents or ( full_resolved_path == project_dir_path) if path.is_absolute() or not is_inside: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/buildstream/plugins/sources/git.py new/BuildStream-1.2.3/buildstream/plugins/sources/git.py --- old/BuildStream-1.2.2/buildstream/plugins/sources/git.py 2018-09-24 16:19:50.000000000 +0200 +++ new/BuildStream-1.2.3/buildstream/plugins/sources/git.py 2018-10-04 16:40:53.000000000 +0200 @@ -164,10 +164,18 @@ cwd=self.mirror) def fetch(self, alias_override=None): - self.ensure(alias_override) - if not self.has_ref(): - self._fetch(alias_override) - self.assert_ref() + # Resolve the URL for the message + resolved_url = self.source.translate_url(self.url, + alias_override=alias_override, + primary=self.primary) + + with self.source.timed_activity("Fetching from {}" + .format(resolved_url), + silent_nested=True): + self.ensure(alias_override) + if not self.has_ref(): + self._fetch(alias_override) + self.assert_ref() def has_ref(self): if not self.ref: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/buildstream/source.py new/BuildStream-1.2.3/buildstream/source.py --- old/BuildStream-1.2.2/buildstream/source.py 2018-09-24 16:19:52.000000000 +0200 +++ new/BuildStream-1.2.3/buildstream/source.py 2018-10-04 16:40:53.000000000 +0200 @@ -585,28 +585,48 @@ # def _fetch(self): project = self._get_project() - source_fetchers = self.get_source_fetchers() + context = self._get_context() + + # Silence the STATUS messages which might happen as a result + # of checking the source fetchers. + with context.silence(): + source_fetchers = self.get_source_fetchers() # Use the source fetchers if they are provided # if source_fetchers: - for fetcher in source_fetchers: - alias = fetcher._get_alias() - for uri in project.get_alias_uris(alias, first_pass=self.__first_pass): - try: - fetcher.fetch(uri) - # FIXME: Need to consider temporary vs. permanent failures, - # and how this works with retries. - except BstError as e: - last_error = e - continue - - # No error, we're done with this fetcher - break - - else: - # No break occurred, raise the last detected error - raise last_error + + # Use a contorted loop here, this is to allow us to + # silence the messages which can result from consuming + # the items of source_fetchers, if it happens to be a generator. + # + source_fetchers = iter(source_fetchers) + try: + + while True: + + with context.silence(): + fetcher = next(source_fetchers) + + alias = fetcher._get_alias() + for uri in project.get_alias_uris(alias, first_pass=self.__first_pass): + try: + fetcher.fetch(uri) + # FIXME: Need to consider temporary vs. permanent failures, + # and how this works with retries. + except BstError as e: + last_error = e + continue + + # No error, we're done with this fetcher + break + + else: + # No break occurred, raise the last detected error + raise last_error + + except StopIteration: + pass # Default codepath is to reinstantiate the Source # diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/buildstream/utils.py new/BuildStream-1.2.3/buildstream/utils.py --- old/BuildStream-1.2.2/buildstream/utils.py 2018-09-24 16:19:50.000000000 +0200 +++ new/BuildStream-1.2.3/buildstream/utils.py 2018-10-05 08:44:50.000000000 +0200 @@ -496,7 +496,7 @@ @contextmanager def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None, - errors=None, newline=None, closefd=True, opener=None): + errors=None, newline=None, closefd=True, opener=None, tempdir=None): """Save a file with a temporary name and rename it into place when ready. This is a context manager which is meant for saving data to files. @@ -523,8 +523,9 @@ # https://bugs.python.org/issue8604 assert os.path.isabs(filename), "The utils.save_file_atomic() parameter ``filename`` must be an absolute path" - dirname = os.path.dirname(filename) - fd, tempname = tempfile.mkstemp(dir=dirname) + if tempdir is None: + tempdir = os.path.dirname(filename) + fd, tempname = tempfile.mkstemp(dir=tempdir) os.close(fd) f = open(tempname, mode=mode, buffering=buffering, encoding=encoding, @@ -556,6 +557,9 @@ # # Get the disk usage of a given directory in bytes. # +# This function assumes that files do not inadvertantly +# disappear while this function is running. +# # Arguments: # (str) The path whose size to check. # @@ -675,7 +679,7 @@ try: shutil.rmtree(rootpath, **kwargs) - except shutil.Error as e: + except OSError as e: raise UtilError("Failed to remove cache directory '{}': {}" .format(rootpath, e)) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/BuildStream-1.2.2/tests/format/project.py new/BuildStream-1.2.3/tests/format/project.py --- old/BuildStream-1.2.2/tests/format/project.py 2018-09-24 16:19:50.000000000 +0200 +++ new/BuildStream-1.2.3/tests/format/project.py 2018-10-04 16:40:53.000000000 +0200 @@ -181,3 +181,15 @@ # Assert that the cache keys are different assert result1.output != result2.output + + +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'element-path')) +def test_element_path_project_path_contains_symlinks(cli, datafiles, tmpdir): + real_project = str(datafiles) + linked_project = os.path.join(str(tmpdir), 'linked') + os.symlink(real_project, linked_project) + os.makedirs(os.path.join(real_project, 'elements'), exist_ok=True) + with open(os.path.join(real_project, 'elements', 'element.bst'), 'w') as f: + f.write("kind: manual\n") + result = cli.run(project=linked_project, args=['show', 'element.bst']) + result.assert_success()