diff options
Diffstat (limited to 'chromium/third_party/pywebsocket/src/mod_pywebsocket/_stream_hybi.py')
-rw-r--r-- | chromium/third_party/pywebsocket/src/mod_pywebsocket/_stream_hybi.py | 118 |
1 files changed, 72 insertions, 46 deletions
diff --git a/chromium/third_party/pywebsocket/src/mod_pywebsocket/_stream_hybi.py b/chromium/third_party/pywebsocket/src/mod_pywebsocket/_stream_hybi.py index cae045a2825..a8a49e3c3dc 100644 --- a/chromium/third_party/pywebsocket/src/mod_pywebsocket/_stream_hybi.py +++ b/chromium/third_party/pywebsocket/src/mod_pywebsocket/_stream_hybi.py @@ -280,7 +280,7 @@ def parse_frame(receive_bytes, logger=None, if logger.isEnabledFor(common.LOGLEVEL_FINE): unmask_start = time.time() - bytes = masker.mask(raw_payload_bytes) + unmasked_bytes = masker.mask(raw_payload_bytes) if logger.isEnabledFor(common.LOGLEVEL_FINE): logger.log( @@ -288,7 +288,7 @@ def parse_frame(receive_bytes, logger=None, 'Done unmasking payload data at %s MB/s', payload_length / (time.time() - unmask_start) / 1000 / 1000) - return opcode, bytes, fin, rsv1, rsv2, rsv3 + return opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 class FragmentedFrameBuilder(object): @@ -309,7 +309,7 @@ class FragmentedFrameBuilder(object): # frames in the message are all the same. self._opcode = common.OPCODE_TEXT - def build(self, message, end, binary): + def build(self, payload_data, end, binary): if binary: frame_type = common.OPCODE_BINARY else: @@ -332,10 +332,10 @@ class FragmentedFrameBuilder(object): if binary or not self._encode_utf8: return create_binary_frame( - message, opcode, fin, self._mask, self._frame_filters) + payload_data, opcode, fin, self._mask, self._frame_filters) else: return create_text_frame( - message, opcode, fin, self._mask, self._frame_filters) + payload_data, opcode, fin, self._mask, self._frame_filters) def _create_control_frame(opcode, body, mask, frame_filters): @@ -389,9 +389,6 @@ class StreamOptions(object): def __init__(self): """Constructs StreamOptions.""" - # Enables deflate-stream extension. - self.deflate_stream = False - # Filters applied to frames. self.outgoing_frame_filters = [] self.incoming_frame_filters = [] @@ -403,9 +400,6 @@ class StreamOptions(object): self.encode_text_message_to_utf8 = True self.mask_send = False self.unmask_receive = True - # RFC6455 disallows fragmented control frames, but mux extension - # relaxes the restriction. - self.allow_fragmented_control_frame = False class Stream(StreamBase): @@ -426,10 +420,6 @@ class Stream(StreamBase): self._options = options - if self._options.deflate_stream: - self._logger.debug('Setup filter for deflate-stream') - self._request = util.DeflateRequest(self._request) - self._request.client_terminated = False self._request.server_terminated = False @@ -463,10 +453,36 @@ class Stream(StreamBase): unmask_receive=self._options.unmask_receive) def _receive_frame_as_frame_object(self): - opcode, bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame() + opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame() return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3, - opcode=opcode, payload=bytes) + opcode=opcode, payload=unmasked_bytes) + + def receive_filtered_frame(self): + """Receives a frame and applies frame filters and message filters. + The frame to be received must satisfy following conditions: + - The frame is not fragmented. + - The opcode of the frame is TEXT or BINARY. + + DO NOT USE this method except for testing purpose. + """ + + frame = self._receive_frame_as_frame_object() + if not frame.fin: + raise InvalidFrameException( + 'Segmented frames must not be received via ' + 'receive_filtered_frame()') + if (frame.opcode != common.OPCODE_TEXT and + frame.opcode != common.OPCODE_BINARY): + raise InvalidFrameException( + 'Control frames must not be received via ' + 'receive_filtered_frame()') + + for frame_filter in self._options.incoming_frame_filters: + frame_filter.filter(frame) + for message_filter in self._options.incoming_message_filters: + frame.payload = message_filter.filter(frame.payload) + return frame def send_message(self, message, end=True, binary=False): """Send message. @@ -493,7 +509,35 @@ class Stream(StreamBase): message = message_filter.filter(message, end, binary) try: - self._write(self._writer.build(message, end, binary)) + # Set this to any positive integer to limit maximum size of data in + # payload data of each frame. + MAX_PAYLOAD_DATA_SIZE = -1 + + if MAX_PAYLOAD_DATA_SIZE <= 0: + self._write(self._writer.build(message, end, binary)) + return + + bytes_written = 0 + while True: + end_for_this_frame = end + bytes_to_write = len(message) - bytes_written + if (MAX_PAYLOAD_DATA_SIZE > 0 and + bytes_to_write > MAX_PAYLOAD_DATA_SIZE): + end_for_this_frame = False + bytes_to_write = MAX_PAYLOAD_DATA_SIZE + + frame = self._writer.build( + message[bytes_written:bytes_written + bytes_to_write], + end_for_this_frame, + binary) + self._write(frame) + + bytes_written += bytes_to_write + + # This if must be placed here (the end of while block) so that + # at least one frame is sent. + if len(message) <= bytes_written: + break except ValueError, e: raise BadOperationException(e) @@ -548,8 +592,7 @@ class Stream(StreamBase): else: # Start of fragmentation frame - if (not self._options.allow_fragmented_control_frame and - common.is_control_opcode(frame.opcode)): + if common.is_control_opcode(frame.opcode): raise InvalidFrameException( 'Control frames must not be fragmented') @@ -593,8 +636,9 @@ class Stream(StreamBase): self._request.ws_close_code, self._request.ws_close_reason) - # Drain junk data after the close frame if necessary. - self._drain_received_data() + # As we've received a close frame, no more data is coming over the + # socket. We can now safely close the socket without worrying about + # RST sending. if self._request.server_terminated: self._logger.debug( @@ -618,7 +662,7 @@ class Stream(StreamBase): reason = '' self._send_closing_handshake(code, reason) self._logger.debug( - 'Sent ack for client-initiated closing handshake ' + 'Acknowledged closing handshake initiated by the peer ' '(code=%r, reason=%r)', code, reason) def _process_ping_message(self, message): @@ -761,13 +805,15 @@ class Stream(StreamBase): self._write(frame) - def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''): + def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason='', + wait_response=True): """Closes a WebSocket connection. Args: code: Status code for close frame. If code is None, a close frame with empty body will be sent. reason: string representing close reason. + wait_response: True when caller want to wait the response. Raises: BadOperationException: when reason is specified with code None or reason is not an instance of both str and unicode. @@ -790,11 +836,11 @@ class Stream(StreamBase): self._send_closing_handshake(code, reason) self._logger.debug( - 'Sent server-initiated closing handshake (code=%r, reason=%r)', + 'Initiated closing handshake (code=%r, reason=%r)', code, reason) if (code == common.STATUS_GOING_AWAY or - code == common.STATUS_PROTOCOL_ERROR): + code == common.STATUS_PROTOCOL_ERROR) or not wait_response: # It doesn't make sense to wait for a close frame if the reason is # protocol error or that the server is going away. For some of # other reasons, it might not make sense to wait for a close frame, @@ -837,25 +883,5 @@ class Stream(StreamBase): return self._original_opcode - def _drain_received_data(self): - """Drains unread data in the receive buffer to avoid sending out TCP - RST packet. This is because when deflate-stream is enabled, some - DEFLATE block for flushing data may follow a close frame. If any data - remains in the receive buffer of a socket when the socket is closed, - it sends out TCP RST packet to the other peer. - - Since mod_python's mp_conn object doesn't support non-blocking read, - we perform this only when pywebsocket is running in standalone mode. - """ - - # If self._options.deflate_stream is true, self._request is - # DeflateRequest, so we can get wrapped request object by - # self._request._request. - # - # Only _StandaloneRequest has _drain_received_data method. - if (self._options.deflate_stream and - ('_drain_received_data' in dir(self._request._request))): - self._request._request._drain_received_data() - # vi:sts=4 sw=4 et |