summaryrefslogtreecommitdiff
path: root/lib/subunit/python/subunit/chunked.py
blob: 82e4b0ddfc53a744acc45a09e611e511d6a64323 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#
#  subunit: extensions to python unittest to get test results from subprocesses.
#  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
#
#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
#  license at the users choice. A copy of both licenses are available in the
#  project source as Apache-2.0 and BSD. You may not use this file except in
#  compliance with one of these two licences.
#  
#  Unless required by applicable law or agreed to in writing, software
#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
#  license you chose for the specific language governing permissions and
#  limitations under that license.
#

"""Encoder/decoder for http style chunked encoding."""

class Decoder(object):
    """Decode chunked content to a byte stream."""

    def __init__(self, output):
        """Create a decoder decoding to output.

        :param output: A file-like object. Bytes written to the Decoder are
            decoded to strip off the chunking and written to the output.
            Up to a full write worth of data or a single control line  may be
            buffered (whichever is larger). The close method should be called
            when no more data is available, to detect short streams; the
            write method will return none-None when the end of a stream is
            detected.
        """
        self.output = output
        self.buffered_bytes = []
        self.state = self._read_length
        self.body_length = 0

    def close(self):
        """Close the decoder.

        :raises ValueError: If the stream is incomplete ValueError is raised.
        """
        if self.state != self._finished:
            raise ValueError("incomplete stream")

    def _finished(self):
        """Finished reading, return any remaining bytes."""
        if self.buffered_bytes:
            buffered_bytes = self.buffered_bytes
            self.buffered_bytes = []
            return ''.join(buffered_bytes)
        else:
            raise ValueError("stream is finished")

    def _read_body(self):
        """Pass body bytes to the output."""
        while self.body_length and self.buffered_bytes:
            if self.body_length >= len(self.buffered_bytes[0]):
                self.output.write(self.buffered_bytes[0])
                self.body_length -= len(self.buffered_bytes[0])
                del self.buffered_bytes[0]
                # No more data available.
                if not self.body_length:
                    self.state = self._read_length
            else:
                self.output.write(self.buffered_bytes[0][:self.body_length])
                self.buffered_bytes[0] = \
                    self.buffered_bytes[0][self.body_length:]
                self.body_length = 0
                self.state = self._read_length
                return self.state()

    def _read_length(self):
        """Try to decode a length from the bytes."""
        count = -1
        match_chars = "0123456789abcdefABCDEF\r\n"
        count_chars = []
        for bytes in self.buffered_bytes:
            for byte in bytes:
                if byte not in match_chars:
                    break
                count_chars.append(byte)
                if byte == '\n':
                    break
        if not count_chars:
            return
        if count_chars[-1][-1] != '\n':
            return
        count_str = ''.join(count_chars)
        self.body_length = int(count_str[:-2], 16)
        excess_bytes = len(count_str)
        while excess_bytes:
            if excess_bytes >= len(self.buffered_bytes[0]):
                excess_bytes -= len(self.buffered_bytes[0])
                del self.buffered_bytes[0]
            else:
                self.buffered_bytes[0] = self.buffered_bytes[0][excess_bytes:]
                excess_bytes = 0
        if not self.body_length:
            self.state = self._finished
            if not self.buffered_bytes:
                # May not call into self._finished with no buffered data.
                return ''
        else:
            self.state = self._read_body
        return self.state()

    def write(self, bytes):
        """Decode bytes to the output stream.
        
        :raises ValueError: If the stream has already seen the end of file
            marker.
        :returns: None, or the excess bytes beyond the end of file marker.
        """
        if bytes:
            self.buffered_bytes.append(bytes)
        return self.state()


class Encoder(object):
    """Encode content to a stream using HTTP Chunked coding."""

    def __init__(self, output):
        """Create an encoder encoding to output.

        :param output: A file-like object. Bytes written to the Encoder
            will be encoded using HTTP chunking. Small writes may be buffered
            and the ``close`` method must be called to finish the stream.
        """
        self.output = output
        self.buffered_bytes = []
        self.buffer_size = 0

    def flush(self, extra_len=0):
        """Flush the encoder to the output stream.
        
        :param extra_len: Increase the size of the chunk by this many bytes
            to allow for a subsequent write.
        """
        if not self.buffer_size and not extra_len:
            return
        buffered_bytes = self.buffered_bytes
        buffer_size = self.buffer_size
        self.buffered_bytes = []
        self.buffer_size = 0
        self.output.write("%X\r\n" % (buffer_size + extra_len))
        if buffer_size:
            self.output.write(''.join(buffered_bytes))
        return True

    def write(self, bytes):
        """Encode bytes to the output stream."""
        bytes_len = len(bytes)
        if self.buffer_size + bytes_len >= 65536:
            self.flush(bytes_len)
            self.output.write(bytes)
        else:
            self.buffered_bytes.append(bytes)
            self.buffer_size += bytes_len

    def close(self):
        """Finish the stream. This does not close the output stream."""
        self.flush()
        self.output.write("0\r\n")