Source code for interactive_python.encoding

from abc import abstractmethod
from gzip import GzipFile
import varint
import io
import zlib


[docs]class EncodingException(Exception): """An EncodingException is raised if an error occurs in an encoding or decoding algorithm. Raising this exception triggers a fallback to plain text encoding. """ pass
[docs]class Encoding: """Encoding is an abstract class that defines methods for decoding incoming and encoding outgoing websocket calls. Both encode() and decode() are allowed to raise EncodingExceptions, which will trigger a fallback to plain-text encoding. """
[docs] @abstractmethod def name(self): pass
[docs] @abstractmethod def encode(self, data): """ encode takes a string of data and returns its encoded form """ pass
[docs] @abstractmethod def decode(self, data): """ decode takes a byte slice of data and returns it decoded, string form """ pass
def reset_buffer(buffer, value=None): buffer.truncate(0) buffer.seek(0) if value is not None: buffer.write(value)
[docs]def reset_buffer(buffer, value=None): buffer.truncate(0) buffer.seek(0) if value is not None: buffer.write(value)
[docs]class TextEncoding(Encoding):
[docs] def name(self): return 'text'
[docs] def encode(self, data): return data
[docs] def decode(self, data): return data
[docs]class GzipEncoding(Encoding): def __init__(self, compression_level=6): super() self._encoder_buffer = io.BytesIO() self._encoder = None self._decoder_buffer = io.BytesIO() self._decoder = zlib.decompressobj(16 + zlib.MAX_WBITS) self._compression_level = compression_level
[docs] def name(self): return 'gzip'
[docs] def encode(self, data): data = data.encode('utf-8') self._encoder_buffer.write(varint.encode(len(data))) # Don't initialize the encoder before the first call to encode(), since # it writes the gzip header immediately and we need to insert the # message length prior to that happening. if self._encoder is None: self._encoder = GzipFile(fileobj=self._encoder_buffer, mode='wb', compresslevel=self._compression_level) self._encoder.write(data) self._encoder.flush() output = self._encoder_buffer.getvalue() reset_buffer(self._encoder_buffer) return output
[docs] def decode(self, data): # Decode the varuint prefix off the data first, then smash the remaining # data into the decode buffer and reset it to read any previous tail. prefix_stream = io.BytesIO(data) decoded_bytes = varint.decode_stream(prefix_stream) self._decoder_buffer.write(data[prefix_stream.tell():]) self._decoder_buffer.seek(0) decoded_data = self._decoder.decompress( self._decoder_buffer.getbuffer(), decoded_bytes) reset_buffer(self._decoder_buffer, self._decoder.unconsumed_tail) return decoded_data.decode('utf-8')