Dre4m Shell
Server IP : 127.0.0.2  /  Your IP : 18.227.89.143
Web Server : Apache/2.4.18 (Ubuntu)
System :
User : www-data ( )
PHP Version : 7.0.33-0ubuntu0.16.04.16
Disable Function : disk_free_space,disk_total_space,diskfreespace,dl,exec,fpaththru,getmyuid,getmypid,highlight_file,ignore_user_abord,leak,listen,link,opcache_get_configuration,opcache_get_status,passthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,php_uname,phpinfo,posix_ctermid,posix_getcwd,posix_getegid,posix_geteuid,posix_getgid,posix_getgrgid,posix_getgrnam,posix_getgroups,posix_getlogin,posix_getpgid,posix_getpgrp,posix_getpid,posix,_getppid,posix_getpwnam,posix_getpwuid,posix_getrlimit,posix_getsid,posix_getuid,posix_isatty,posix_kill,posix_mkfifo,posix_setegid,posix_seteuid,posix_setgid,posix_setpgid,posix_setsid,posix_setuid,posix_times,posix_ttyname,posix_uname,pclose,popen,proc_open,proc_close,proc_get_status,proc_nice,proc_terminate,shell_exec,source,show_source,system,virtual
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python2.7/dist-packages/bzrlib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python2.7/dist-packages/bzrlib/annotate.py
# Copyright (C) 2005-2010 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

"""File annotate based on weave storage"""

from __future__ import absolute_import

# TODO: Choice of more or less verbose formats:
#
# interposed: show more details between blocks of modified lines

# TODO: Show which revision caused a line to merge into the parent

# TODO: perhaps abbreviate timescales depending on how recent they are
# e.g. "3:12 Tue", "13 Oct", "Oct 2005", etc.

import sys
import time

from bzrlib.lazy_import import lazy_import
lazy_import(globals(), """
from bzrlib import (
    patiencediff,
    tsort,
    )
""")
from bzrlib import (
    errors,
    osutils,
    )
from bzrlib.config import extract_email_address
from bzrlib.repository import _strip_NULL_ghosts
from bzrlib.revision import (
    CURRENT_REVISION,
    Revision,
    )


def annotate_file_tree(tree, file_id, to_file, verbose=False, full=False,
    show_ids=False, branch=None):
    """Annotate file_id in a tree.

    The tree should already be read_locked() when annotate_file_tree is called.

    :param tree: The tree to look for revision numbers and history from.
    :param file_id: The file_id to annotate.
    :param to_file: The file to output the annotation to.
    :param verbose: Show all details rather than truncating to ensure
        reasonable text width.
    :param full: XXXX Not sure what this does.
    :param show_ids: Show revision ids in the annotation output.
    :param branch: Branch to use for revision revno lookups
    """
    if branch is None:
        branch = tree.branch
    if to_file is None:
        to_file = sys.stdout

    # Handle the show_ids case
    annotations = list(tree.annotate_iter(file_id))
    if show_ids:
        return _show_id_annotations(annotations, to_file, full)

    if not getattr(tree, "get_revision_id", False):
        # Create a virtual revision to represent the current tree state.
        # Should get some more pending commit attributes, like pending tags,
        # bugfixes etc.
        current_rev = Revision(CURRENT_REVISION)
        current_rev.parent_ids = tree.get_parent_ids()
        try:
            current_rev.committer = branch.get_config_stack().get('email')
        except errors.NoWhoami:
            current_rev.committer = 'local user'
        current_rev.message = "?"
        current_rev.timestamp = round(time.time(), 3)
        current_rev.timezone = osutils.local_time_offset()
    else:
        current_rev = None
    annotation = list(_expand_annotations(annotations, branch,
        current_rev))
    _print_annotations(annotation, verbose, to_file, full)


def _print_annotations(annotation, verbose, to_file, full):
    """Print annotations to to_file.

    :param to_file: The file to output the annotation to.
    :param verbose: Show all details rather than truncating to ensure
        reasonable text width.
    :param full: XXXX Not sure what this does.
    """
    if len(annotation) == 0:
        max_origin_len = max_revno_len = max_revid_len = 0
    else:
        max_origin_len = max(len(x[1]) for x in annotation)
        max_revno_len = max(len(x[0]) for x in annotation)
        max_revid_len = max(len(x[3]) for x in annotation)
    if not verbose:
        max_revno_len = min(max_revno_len, 12)
    max_revno_len = max(max_revno_len, 3)

    # Output the annotations
    prevanno = ''
    encoding = getattr(to_file, 'encoding', None) or \
            osutils.get_terminal_encoding()
    for (revno_str, author, date_str, line_rev_id, text) in annotation:
        if verbose:
            anno = '%-*s %-*s %8s ' % (max_revno_len, revno_str,
                                       max_origin_len, author, date_str)
        else:
            if len(revno_str) > max_revno_len:
                revno_str = revno_str[:max_revno_len-1] + '>'
            anno = "%-*s %-7s " % (max_revno_len, revno_str, author[:7])
        if anno.lstrip() == "" and full:
            anno = prevanno
        try:
            to_file.write(anno)
        except UnicodeEncodeError:
            # cmd_annotate should be passing in an 'exact' object, which means
            # we have a direct handle to sys.stdout or equivalent. It may not
            # be able to handle the exact Unicode characters, but 'annotate' is
            # a user function (non-scripting), so shouldn't die because of
            # unrepresentable annotation characters. So encode using 'replace',
            # and write them again.
            to_file.write(anno.encode(encoding, 'replace'))
        to_file.write('| %s\n' % (text,))
        prevanno = anno


def _show_id_annotations(annotations, to_file, full):
    if not annotations:
        return
    last_rev_id = None
    max_origin_len = max(len(origin) for origin, text in annotations)
    for origin, text in annotations:
        if full or last_rev_id != origin:
            this = origin
        else:
            this = ''
        to_file.write('%*s | %s' % (max_origin_len, this, text))
        last_rev_id = origin
    return


def _expand_annotations(annotations, branch, current_rev=None):
    """Expand a file's annotations into command line UI ready tuples.

    Each tuple includes detailed information, such as the author name, and date
    string for the commit, rather than just the revision id.

    :param annotations: The annotations to expand.
    :param revision_id_to_revno: A map from id to revision numbers.
    :param branch: A locked branch to query for revision details.
    """
    repository = branch.repository
    if current_rev is not None:
        # This can probably become a function on MutableTree, get_revno_map
        # there, or something.
        last_revision = current_rev.revision_id
        # XXX: Partially Cloned from branch, uses the old_get_graph, eep.
        # XXX: The main difficulty is that we need to inject a single new node
        #      (current_rev) into the graph before it gets numbered, etc.
        #      Once KnownGraph gets an 'add_node()' function, we can use
        #      VF.get_known_graph_ancestry().
        graph = repository.get_graph()
        revision_graph = dict(((key, value) for key, value in
            graph.iter_ancestry(current_rev.parent_ids) if value is not None))
        revision_graph = _strip_NULL_ghosts(revision_graph)
        revision_graph[last_revision] = current_rev.parent_ids
        merge_sorted_revisions = tsort.merge_sort(
            revision_graph,
            last_revision,
            None,
            generate_revno=True)
        revision_id_to_revno = dict((rev_id, revno)
            for seq_num, rev_id, depth, revno, end_of_merge in
                merge_sorted_revisions)
    else:
        revision_id_to_revno = branch.get_revision_id_to_revno_map()
    last_origin = None
    revision_ids = set(o for o, t in annotations)
    revisions = {}
    if CURRENT_REVISION in revision_ids:
        revision_id_to_revno[CURRENT_REVISION] = (
            "%d?" % (branch.revno() + 1),)
        revisions[CURRENT_REVISION] = current_rev
    revision_ids = [o for o in revision_ids if
                    repository.has_revision(o)]
    revisions.update((r.revision_id, r) for r in
                     repository.get_revisions(revision_ids))
    for origin, text in annotations:
        text = text.rstrip('\r\n')
        if origin == last_origin:
            (revno_str, author, date_str) = ('','','')
        else:
            last_origin = origin
            if origin not in revisions:
                (revno_str, author, date_str) = ('?','?','?')
            else:
                revno_str = '.'.join(str(i) for i in
                                            revision_id_to_revno[origin])
            rev = revisions[origin]
            tz = rev.timezone or 0
            date_str = time.strftime('%Y%m%d',
                                     time.gmtime(rev.timestamp + tz))
            # a lazy way to get something like the email address
            # TODO: Get real email address
            author = rev.get_apparent_authors()[0]
            try:
                author = extract_email_address(author)
            except errors.NoEmailInUsername:
                pass        # use the whole name
        yield (revno_str, author, date_str, origin, text)


def reannotate(parents_lines, new_lines, new_revision_id,
               _left_matching_blocks=None,
               heads_provider=None):
    """Create a new annotated version from new lines and parent annotations.

    :param parents_lines: List of annotated lines for all parents
    :param new_lines: The un-annotated new lines
    :param new_revision_id: The revision-id to associate with new lines
        (will often be CURRENT_REVISION)
    :param left_matching_blocks: a hint about which areas are common
        between the text and its left-hand-parent.  The format is
        the SequenceMatcher.get_matching_blocks format
        (start_left, start_right, length_of_match).
    :param heads_provider: An object which provides a .heads() call to resolve
        if any revision ids are children of others.
        If None, then any ancestry disputes will be resolved with
        new_revision_id
    """
    if len(parents_lines) == 0:
        lines = [(new_revision_id, line) for line in new_lines]
    elif len(parents_lines) == 1:
        lines = _reannotate(parents_lines[0], new_lines, new_revision_id,
                            _left_matching_blocks)
    elif len(parents_lines) == 2:
        left = _reannotate(parents_lines[0], new_lines, new_revision_id,
                           _left_matching_blocks)
        lines = _reannotate_annotated(parents_lines[1], new_lines,
                                      new_revision_id, left,
                                      heads_provider)
    else:
        reannotations = [_reannotate(parents_lines[0], new_lines,
                                     new_revision_id, _left_matching_blocks)]
        reannotations.extend(_reannotate(p, new_lines, new_revision_id)
                             for p in parents_lines[1:])
        lines = []
        for annos in zip(*reannotations):
            origins = set(a for a, l in annos)
            if len(origins) == 1:
                # All the parents agree, so just return the first one
                lines.append(annos[0])
            else:
                line = annos[0][1]
                if len(origins) == 2 and new_revision_id in origins:
                    origins.remove(new_revision_id)
                if len(origins) == 1:
                    lines.append((origins.pop(), line))
                else:
                    lines.append((new_revision_id, line))
    return lines


def _reannotate(parent_lines, new_lines, new_revision_id,
                matching_blocks=None):
    new_cur = 0
    if matching_blocks is None:
        plain_parent_lines = [l for r, l in parent_lines]
        matcher = patiencediff.PatienceSequenceMatcher(None,
            plain_parent_lines, new_lines)
        matching_blocks = matcher.get_matching_blocks()
    lines = []
    for i, j, n in matching_blocks:
        for line in new_lines[new_cur:j]:
            lines.append((new_revision_id, line))
        lines.extend(parent_lines[i:i+n])
        new_cur = j + n
    return lines


def _get_matching_blocks(old, new):
    matcher = patiencediff.PatienceSequenceMatcher(None, old, new)
    return matcher.get_matching_blocks()


_break_annotation_tie = None

def _old_break_annotation_tie(annotated_lines):
    """Chose an attribution between several possible ones.

    :param annotated_lines: A list of tuples ((file_id, rev_id), line) where
        the lines are identical but the revids different while no parent
        relation exist between them

     :return : The "winning" line. This must be one with a revid that
         guarantees that further criss-cross merges will converge. Failing to
         do so have performance implications.
    """
    # sort lexicographically so that we always get a stable result.

    # TODO: while 'sort' is the easiest (and nearly the only possible solution)
    # with the current implementation, chosing the oldest revision is known to
    # provide better results (as in matching user expectations). The most
    # common use case being manual cherry-pick from an already existing
    # revision.
    return sorted(annotated_lines)[0]


def _find_matching_unannotated_lines(output_lines, plain_child_lines,
                                     child_lines, start_child, end_child,
                                     right_lines, start_right, end_right,
                                     heads_provider, revision_id):
    """Find lines in plain_right_lines that match the existing lines.

    :param output_lines: Append final annotated lines to this list
    :param plain_child_lines: The unannotated new lines for the child text
    :param child_lines: Lines for the child text which have been annotated
        for the left parent

    :param start_child: Position in plain_child_lines and child_lines to start
        the match searching
    :param end_child: Last position in plain_child_lines and child_lines to
        search for a match
    :param right_lines: The annotated lines for the whole text for the right
        parent
    :param start_right: Position in right_lines to start the match
    :param end_right: Last position in right_lines to search for a match
    :param heads_provider: When parents disagree on the lineage of a line, we
        need to check if one side supersedes the other
    :param revision_id: The label to give if a line should be labeled 'tip'
    """
    output_extend = output_lines.extend
    output_append = output_lines.append
    # We need to see if any of the unannotated lines match
    plain_right_subset = [l for a,l in right_lines[start_right:end_right]]
    plain_child_subset = plain_child_lines[start_child:end_child]
    match_blocks = _get_matching_blocks(plain_right_subset, plain_child_subset)

    last_child_idx = 0

    for right_idx, child_idx, match_len in match_blocks:
        # All the lines that don't match are just passed along
        if child_idx > last_child_idx:
            output_extend(child_lines[start_child + last_child_idx
                                      :start_child + child_idx])
        for offset in xrange(match_len):
            left = child_lines[start_child+child_idx+offset]
            right = right_lines[start_right+right_idx+offset]
            if left[0] == right[0]:
                # The annotations match, just return the left one
                output_append(left)
            elif left[0] == revision_id:
                # The left parent marked this as unmatched, so let the
                # right parent claim it
                output_append(right)
            else:
                # Left and Right both claim this line
                if heads_provider is None:
                    output_append((revision_id, left[1]))
                else:
                    heads = heads_provider.heads((left[0], right[0]))
                    if len(heads) == 1:
                        output_append((iter(heads).next(), left[1]))
                    else:
                        # Both claim different origins, get a stable result.
                        # If the result is not stable, there is a risk a
                        # performance degradation as criss-cross merges will
                        # flip-flop the attribution.
                        if _break_annotation_tie is None:
                            output_append(
                                _old_break_annotation_tie([left, right]))
                        else:
                            output_append(_break_annotation_tie([left, right]))
        last_child_idx = child_idx + match_len


def _reannotate_annotated(right_parent_lines, new_lines, new_revision_id,
                          annotated_lines, heads_provider):
    """Update the annotations for a node based on another parent.

    :param right_parent_lines: A list of annotated lines for the right-hand
        parent.
    :param new_lines: The unannotated new lines.
    :param new_revision_id: The revision_id to attribute to lines which are not
        present in either parent.
    :param annotated_lines: A list of annotated lines. This should be the
        annotation of new_lines based on parents seen so far.
    :param heads_provider: When parents disagree on the lineage of a line, we
        need to check if one side supersedes the other.
    """
    if len(new_lines) != len(annotated_lines):
        raise AssertionError("mismatched new_lines and annotated_lines")
    # First compare the newly annotated lines with the right annotated lines.
    # Lines which were not changed in left or right should match. This tends to
    # be the bulk of the lines, and they will need no further processing.
    lines = []
    lines_extend = lines.extend
    last_right_idx = 0 # The line just after the last match from the right side
    last_left_idx = 0
    matching_left_and_right = _get_matching_blocks(right_parent_lines,
                                                   annotated_lines)
    for right_idx, left_idx, match_len in matching_left_and_right:
        # annotated lines from last_left_idx to left_idx did not match the
        # lines from last_right_idx to right_idx, the raw lines should be
        # compared to determine what annotations need to be updated
        if last_right_idx == right_idx or last_left_idx == left_idx:
            # One of the sides is empty, so this is a pure insertion
            lines_extend(annotated_lines[last_left_idx:left_idx])
        else:
            # We need to see if any of the unannotated lines match
            _find_matching_unannotated_lines(lines,
                                             new_lines, annotated_lines,
                                             last_left_idx, left_idx,
                                             right_parent_lines,
                                             last_right_idx, right_idx,
                                             heads_provider,
                                             new_revision_id)
        last_right_idx = right_idx + match_len
        last_left_idx = left_idx + match_len
        # If left and right agree on a range, just push that into the output
        lines_extend(annotated_lines[left_idx:left_idx + match_len])
    return lines


try:
    from bzrlib._annotator_pyx import Annotator
except ImportError, e:
    osutils.failed_to_load_extension(e)
    from bzrlib._annotator_py import Annotator

Anon7 - 2022
AnonSec Team