Dre4m Shell
Server IP : 127.0.0.2  /  Your IP : 3.135.18.100
Web Server : Apache/2.4.18 (Ubuntu)
System :
User : www-data ( )
PHP Version : 7.0.33-0ubuntu0.16.04.16
Disable Function : disk_free_space,disk_total_space,diskfreespace,dl,exec,fpaththru,getmyuid,getmypid,highlight_file,ignore_user_abord,leak,listen,link,opcache_get_configuration,opcache_get_status,passthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,php_uname,phpinfo,posix_ctermid,posix_getcwd,posix_getegid,posix_geteuid,posix_getgid,posix_getgrgid,posix_getgrnam,posix_getgroups,posix_getlogin,posix_getpgid,posix_getpgrp,posix_getpid,posix,_getppid,posix_getpwnam,posix_getpwuid,posix_getrlimit,posix_getsid,posix_getuid,posix_isatty,posix_kill,posix_mkfifo,posix_setegid,posix_seteuid,posix_setgid,posix_setpgid,posix_setsid,posix_setuid,posix_times,posix_ttyname,posix_uname,pclose,popen,proc_open,proc_close,proc_get_status,proc_nice,proc_terminate,shell_exec,source,show_source,system,virtual
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python2.7/dist-packages/bzrlib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python2.7/dist-packages/bzrlib/_groupcompress_py.py
# Copyright (C) 2009 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

"""Python version of compiled extensions for doing compression.

We separate the implementation from the groupcompress.py to avoid importing
useless stuff.
"""

from __future__ import absolute_import

from bzrlib import osutils


class _OutputHandler(object):
    """A simple class which just tracks how to split up an insert request."""

    def __init__(self, out_lines, index_lines, min_len_to_index):
        self.out_lines = out_lines
        self.index_lines = index_lines
        self.min_len_to_index = min_len_to_index
        self.cur_insert_lines = []
        self.cur_insert_len = 0

    def add_copy(self, start_byte, end_byte):
        # The data stream allows >64kB in a copy, but to match the compiled
        # code, we will also limit it to a 64kB copy
        for start_byte in xrange(start_byte, end_byte, 64*1024):
            num_bytes = min(64*1024, end_byte - start_byte)
            copy_bytes = encode_copy_instruction(start_byte, num_bytes)
            self.out_lines.append(copy_bytes)
            self.index_lines.append(False)

    def _flush_insert(self):
        if not self.cur_insert_lines:
            return
        if self.cur_insert_len > 127:
            raise AssertionError('We cannot insert more than 127 bytes'
                                 ' at a time.')
        self.out_lines.append(chr(self.cur_insert_len))
        self.index_lines.append(False)
        self.out_lines.extend(self.cur_insert_lines)
        if self.cur_insert_len < self.min_len_to_index:
            self.index_lines.extend([False]*len(self.cur_insert_lines))
        else:
            self.index_lines.extend([True]*len(self.cur_insert_lines))
        self.cur_insert_lines = []
        self.cur_insert_len = 0

    def _insert_long_line(self, line):
        # Flush out anything pending
        self._flush_insert()
        line_len = len(line)
        for start_index in xrange(0, line_len, 127):
            next_len = min(127, line_len - start_index)
            self.out_lines.append(chr(next_len))
            self.index_lines.append(False)
            self.out_lines.append(line[start_index:start_index+next_len])
            # We don't index long lines, because we won't be able to match
            # a line split across multiple inserts anway
            self.index_lines.append(False)

    def add_insert(self, lines):
        if self.cur_insert_lines != []:
            raise AssertionError('self.cur_insert_lines must be empty when'
                                 ' adding a new insert')
        for line in lines:
            if len(line) > 127:
                self._insert_long_line(line)
            else:
                next_len = len(line) + self.cur_insert_len
                if next_len > 127:
                    # Adding this line would overflow, so flush, and start over
                    self._flush_insert()
                    self.cur_insert_lines = [line]
                    self.cur_insert_len = len(line)
                else:
                    self.cur_insert_lines.append(line)
                    self.cur_insert_len = next_len
        self._flush_insert()


class LinesDeltaIndex(object):
    """This class indexes matches between strings.

    :ivar lines: The 'static' lines that will be preserved between runs.
    :ivar _matching_lines: A dict of {line:[matching offsets]}
    :ivar line_offsets: The byte offset for the end of each line, used to
        quickly map between a matching line number and the byte location
    :ivar endpoint: The total number of bytes in self.line_offsets
    """

    _MIN_MATCH_BYTES = 10
    _SOFT_MIN_MATCH_BYTES = 200

    def __init__(self, lines):
        self.lines = []
        self.line_offsets = []
        self.endpoint = 0
        self._matching_lines = {}
        self.extend_lines(lines, [True]*len(lines))

    def _update_matching_lines(self, new_lines, index):
        matches = self._matching_lines
        start_idx = len(self.lines)
        if len(new_lines) != len(index):
            raise AssertionError('The number of lines to be indexed does'
                ' not match the index/don\'t index flags: %d != %d'
                % (len(new_lines), len(index)))
        for idx, do_index in enumerate(index):
            if not do_index:
                continue
            line = new_lines[idx]
            try:
                matches[line].add(start_idx + idx)
            except KeyError:
                matches[line] = set([start_idx + idx])

    def get_matches(self, line):
        """Return the lines which match the line in right."""
        try:
            return self._matching_lines[line]
        except KeyError:
            return None

    def _get_longest_match(self, lines, pos):
        """Look at all matches for the current line, return the longest.

        :param lines: The lines we are matching against
        :param pos: The current location we care about
        :param locations: A list of lines that matched the current location.
            This may be None, but often we'll have already found matches for
            this line.
        :return: (start_in_self, start_in_lines, num_lines)
            All values are the offset in the list (aka the line number)
            If start_in_self is None, then we have no matches, and this line
            should be inserted in the target.
        """
        range_start = pos
        range_len = 0
        prev_locations = None
        max_pos = len(lines)
        matching = self._matching_lines
        while pos < max_pos:
            try:
                locations = matching[lines[pos]]
            except KeyError:
                # No more matches, just return whatever we have, but we know
                # that this last position is not going to match anything
                pos += 1
                break
            # We have a match
            if prev_locations is None:
                # This is the first match in a range
                prev_locations = locations
                range_len = 1
                locations = None # Consumed
            else:
                # We have a match started, compare to see if any of the
                # current matches can be continued
                next_locations = locations.intersection([loc + 1 for loc
                                                         in prev_locations])
                if next_locations:
                    # At least one of the regions continues to match
                    prev_locations = set(next_locations)
                    range_len += 1
                    locations = None # Consumed
                else:
                    # All current regions no longer match.
                    # This line does still match something, just not at the
                    # end of the previous matches. We will return locations
                    # so that we can avoid another _matching_lines lookup.
                    break
            pos += 1
        if prev_locations is None:
            # We have no matches, this is a pure insert
            return None, pos
        smallest = min(prev_locations)
        return (smallest - range_len + 1, range_start, range_len), pos

    def get_matching_blocks(self, lines, soft=False):
        """Return the ranges in lines which match self.lines.

        :param lines: lines to compress
        :return: A list of (old_start, new_start, length) tuples which reflect
            a region in self.lines that is present in lines.  The last element
            of the list is always (old_len, new_len, 0) to provide a end point
            for generating instructions from the matching blocks list.
        """
        # In this code, we iterate over multiple _get_longest_match calls, to
        # find the next longest copy, and possible insert regions. We then
        # convert that to the simple matching_blocks representation, since
        # otherwise inserting 10 lines in a row would show up as 10
        # instructions.
        result = []
        pos = 0
        max_pos = len(lines)
        result_append = result.append
        min_match_bytes = self._MIN_MATCH_BYTES
        if soft:
            min_match_bytes = self._SOFT_MIN_MATCH_BYTES
        while pos < max_pos:
            block, pos = self._get_longest_match(lines, pos)
            if block is not None:
                # Check to see if we match fewer than min_match_bytes. As we
                # will turn this into a pure 'insert', rather than a copy.
                # block[-1] is the number of lines. A quick check says if we
                # have more lines than min_match_bytes, then we know we have
                # enough bytes.
                if block[-1] < min_match_bytes:
                    # This block may be a 'short' block, check
                    old_start, new_start, range_len = block
                    matched_bytes = sum(map(len,
                        lines[new_start:new_start + range_len]))
                    if matched_bytes < min_match_bytes:
                        block = None
            if block is not None:
                result_append(block)
        result_append((len(self.lines), len(lines), 0))
        return result

    def extend_lines(self, lines, index):
        """Add more lines to the left-lines list.

        :param lines: A list of lines to add
        :param index: A True/False for each node to define if it should be
            indexed.
        """
        self._update_matching_lines(lines, index)
        self.lines.extend(lines)
        endpoint = self.endpoint
        for line in lines:
            endpoint += len(line)
            self.line_offsets.append(endpoint)
        if len(self.line_offsets) != len(self.lines):
            raise AssertionError('Somehow the line offset indicator'
                ' got out of sync with the line counter.')
        self.endpoint = endpoint

    def _flush_insert(self, start_linenum, end_linenum,
                      new_lines, out_lines, index_lines):
        """Add an 'insert' request to the data stream."""
        bytes_to_insert = ''.join(new_lines[start_linenum:end_linenum])
        insert_length = len(bytes_to_insert)
        # Each insert instruction is at most 127 bytes long
        for start_byte in xrange(0, insert_length, 127):
            insert_count = min(insert_length - start_byte, 127)
            out_lines.append(chr(insert_count))
            # Don't index the 'insert' instruction
            index_lines.append(False)
            insert = bytes_to_insert[start_byte:start_byte+insert_count]
            as_lines = osutils.split_lines(insert)
            out_lines.extend(as_lines)
            index_lines.extend([True]*len(as_lines))

    def _flush_copy(self, old_start_linenum, num_lines,
                    out_lines, index_lines):
        if old_start_linenum == 0:
            first_byte = 0
        else:
            first_byte = self.line_offsets[old_start_linenum - 1]
        stop_byte = self.line_offsets[old_start_linenum + num_lines - 1]
        num_bytes = stop_byte - first_byte
        # The data stream allows >64kB in a copy, but to match the compiled
        # code, we will also limit it to a 64kB copy
        for start_byte in xrange(first_byte, stop_byte, 64*1024):
            num_bytes = min(64*1024, stop_byte - start_byte)
            copy_bytes = encode_copy_instruction(start_byte, num_bytes)
            out_lines.append(copy_bytes)
            index_lines.append(False)

    def make_delta(self, new_lines, bytes_length=None, soft=False):
        """Compute the delta for this content versus the original content."""
        if bytes_length is None:
            bytes_length = sum(map(len, new_lines))
        # reserved for content type, content length
        out_lines = ['', '', encode_base128_int(bytes_length)]
        index_lines = [False, False, False]
        output_handler = _OutputHandler(out_lines, index_lines,
                                        self._MIN_MATCH_BYTES)
        blocks = self.get_matching_blocks(new_lines, soft=soft)
        current_line_num = 0
        # We either copy a range (while there are reusable lines) or we
        # insert new lines. To find reusable lines we traverse
        for old_start, new_start, range_len in blocks:
            if new_start != current_line_num:
                # non-matching region, insert the content
                output_handler.add_insert(new_lines[current_line_num:new_start])
            current_line_num = new_start + range_len
            if range_len:
                # Convert the line based offsets into byte based offsets
                if old_start == 0:
                    first_byte = 0
                else:
                    first_byte = self.line_offsets[old_start - 1]
                last_byte = self.line_offsets[old_start + range_len - 1]
                output_handler.add_copy(first_byte, last_byte)
        return out_lines, index_lines


def encode_base128_int(val):
    """Convert an integer into a 7-bit lsb encoding."""
    bytes = []
    count = 0
    while val >= 0x80:
        bytes.append(chr((val | 0x80) & 0xFF))
        val >>= 7
    bytes.append(chr(val))
    return ''.join(bytes)


def decode_base128_int(bytes):
    """Decode an integer from a 7-bit lsb encoding."""
    offset = 0
    val = 0
    shift = 0
    bval = ord(bytes[offset])
    while bval >= 0x80:
        val |= (bval & 0x7F) << shift
        shift += 7
        offset += 1
        bval = ord(bytes[offset])
    val |= bval << shift
    offset += 1
    return val, offset


def encode_copy_instruction(offset, length):
    """Convert this offset into a control code and bytes."""
    copy_command = 0x80
    copy_bytes = [None]

    for copy_bit in (0x01, 0x02, 0x04, 0x08):
        base_byte = offset & 0xff
        if base_byte:
            copy_command |= copy_bit
            copy_bytes.append(chr(base_byte))
        offset >>= 8
    if length is None:
        raise ValueError("cannot supply a length of None")
    if length > 0x10000:
        raise ValueError("we don't emit copy records for lengths > 64KiB")
    if length == 0:
        raise ValueError("We cannot emit a copy of length 0")
    if length != 0x10000:
        # A copy of length exactly 64*1024 == 0x10000 is sent as a length of 0,
        # since that saves bytes for large chained copies
        for copy_bit in (0x10, 0x20):
            base_byte = length & 0xff
            if base_byte:
                copy_command |= copy_bit
                copy_bytes.append(chr(base_byte))
            length >>= 8
    copy_bytes[0] = chr(copy_command)
    return ''.join(copy_bytes)


def decode_copy_instruction(bytes, cmd, pos):
    """Decode a copy instruction from the next few bytes.

    A copy instruction is a variable number of bytes, so we will parse the
    bytes we care about, and return the new position, as well as the offset and
    length referred to in the bytes.

    :param bytes: A string of bytes
    :param cmd: The command code
    :param pos: The position in bytes right after the copy command
    :return: (offset, length, newpos)
        The offset of the copy start, the number of bytes to copy, and the
        position after the last byte of the copy
    """
    if cmd & 0x80 != 0x80:
        raise ValueError('copy instructions must have bit 0x80 set')
    offset = 0
    length = 0
    if (cmd & 0x01):
        offset = ord(bytes[pos])
        pos += 1
    if (cmd & 0x02):
        offset = offset | (ord(bytes[pos]) << 8)
        pos += 1
    if (cmd & 0x04):
        offset = offset | (ord(bytes[pos]) << 16)
        pos += 1
    if (cmd & 0x08):
        offset = offset | (ord(bytes[pos]) << 24)
        pos += 1
    if (cmd & 0x10):
        length = ord(bytes[pos])
        pos += 1
    if (cmd & 0x20):
        length = length | (ord(bytes[pos]) << 8)
        pos += 1
    if (cmd & 0x40):
        length = length | (ord(bytes[pos]) << 16)
        pos += 1
    if length == 0:
        length = 65536
    return (offset, length, pos)


def make_delta(source_bytes, target_bytes):
    """Create a delta from source to target."""
    if type(source_bytes) is not str:
        raise TypeError('source is not a str')
    if type(target_bytes) is not str:
        raise TypeError('target is not a str')
    line_locations = LinesDeltaIndex(osutils.split_lines(source_bytes))
    delta, _ = line_locations.make_delta(osutils.split_lines(target_bytes),
                                         bytes_length=len(target_bytes))
    return ''.join(delta)


def apply_delta(basis, delta):
    """Apply delta to this object to become new_version_id."""
    if type(basis) is not str:
        raise TypeError('basis is not a str')
    if type(delta) is not str:
        raise TypeError('delta is not a str')
    target_length, pos = decode_base128_int(delta)
    lines = []
    len_delta = len(delta)
    while pos < len_delta:
        cmd = ord(delta[pos])
        pos += 1
        if cmd & 0x80:
            offset, length, pos = decode_copy_instruction(delta, cmd, pos)
            last = offset + length
            if last > len(basis):
                raise ValueError('data would copy bytes past the'
                                 'end of source')
            lines.append(basis[offset:last])
        else: # Insert of 'cmd' bytes
            if cmd == 0:
                raise ValueError('Command == 0 not supported yet')
            lines.append(delta[pos:pos+cmd])
            pos += cmd
    bytes = ''.join(lines)
    if len(bytes) != target_length:
        raise ValueError('Delta claimed to be %d long, but ended up'
                         ' %d long' % (target_length, len(bytes)))
    return bytes


def apply_delta_to_source(source, delta_start, delta_end):
    """Extract a delta from source bytes, and apply it."""
    source_size = len(source)
    if delta_start >= source_size:
        raise ValueError('delta starts after source')
    if delta_end > source_size:
        raise ValueError('delta ends after source')
    if delta_start >= delta_end:
        raise ValueError('delta starts after it ends')
    delta_bytes = source[delta_start:delta_end]
    return apply_delta(source, delta_bytes)

Anon7 - 2022
AnonSec Team