summaryrefslogtreecommitdiffstats
path: root/chromium/third_party/pywebsocket/src/mod_pywebsocket/_stream_hybi.py
diff options
context:
space:
mode:
Diffstat (limited to 'chromium/third_party/pywebsocket/src/mod_pywebsocket/_stream_hybi.py')
-rw-r--r--chromium/third_party/pywebsocket/src/mod_pywebsocket/_stream_hybi.py118
1 files changed, 72 insertions, 46 deletions
diff --git a/chromium/third_party/pywebsocket/src/mod_pywebsocket/_stream_hybi.py b/chromium/third_party/pywebsocket/src/mod_pywebsocket/_stream_hybi.py
index cae045a2825..a8a49e3c3dc 100644
--- a/chromium/third_party/pywebsocket/src/mod_pywebsocket/_stream_hybi.py
+++ b/chromium/third_party/pywebsocket/src/mod_pywebsocket/_stream_hybi.py
@@ -280,7 +280,7 @@ def parse_frame(receive_bytes, logger=None,
if logger.isEnabledFor(common.LOGLEVEL_FINE):
unmask_start = time.time()
- bytes = masker.mask(raw_payload_bytes)
+ unmasked_bytes = masker.mask(raw_payload_bytes)
if logger.isEnabledFor(common.LOGLEVEL_FINE):
logger.log(
@@ -288,7 +288,7 @@ def parse_frame(receive_bytes, logger=None,
'Done unmasking payload data at %s MB/s',
payload_length / (time.time() - unmask_start) / 1000 / 1000)
- return opcode, bytes, fin, rsv1, rsv2, rsv3
+ return opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3
class FragmentedFrameBuilder(object):
@@ -309,7 +309,7 @@ class FragmentedFrameBuilder(object):
# frames in the message are all the same.
self._opcode = common.OPCODE_TEXT
- def build(self, message, end, binary):
+ def build(self, payload_data, end, binary):
if binary:
frame_type = common.OPCODE_BINARY
else:
@@ -332,10 +332,10 @@ class FragmentedFrameBuilder(object):
if binary or not self._encode_utf8:
return create_binary_frame(
- message, opcode, fin, self._mask, self._frame_filters)
+ payload_data, opcode, fin, self._mask, self._frame_filters)
else:
return create_text_frame(
- message, opcode, fin, self._mask, self._frame_filters)
+ payload_data, opcode, fin, self._mask, self._frame_filters)
def _create_control_frame(opcode, body, mask, frame_filters):
@@ -389,9 +389,6 @@ class StreamOptions(object):
def __init__(self):
"""Constructs StreamOptions."""
- # Enables deflate-stream extension.
- self.deflate_stream = False
-
# Filters applied to frames.
self.outgoing_frame_filters = []
self.incoming_frame_filters = []
@@ -403,9 +400,6 @@ class StreamOptions(object):
self.encode_text_message_to_utf8 = True
self.mask_send = False
self.unmask_receive = True
- # RFC6455 disallows fragmented control frames, but mux extension
- # relaxes the restriction.
- self.allow_fragmented_control_frame = False
class Stream(StreamBase):
@@ -426,10 +420,6 @@ class Stream(StreamBase):
self._options = options
- if self._options.deflate_stream:
- self._logger.debug('Setup filter for deflate-stream')
- self._request = util.DeflateRequest(self._request)
-
self._request.client_terminated = False
self._request.server_terminated = False
@@ -463,10 +453,36 @@ class Stream(StreamBase):
unmask_receive=self._options.unmask_receive)
def _receive_frame_as_frame_object(self):
- opcode, bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
+ opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3,
- opcode=opcode, payload=bytes)
+ opcode=opcode, payload=unmasked_bytes)
+
+ def receive_filtered_frame(self):
+ """Receives a frame and applies frame filters and message filters.
+ The frame to be received must satisfy following conditions:
+ - The frame is not fragmented.
+ - The opcode of the frame is TEXT or BINARY.
+
+ DO NOT USE this method except for testing purpose.
+ """
+
+ frame = self._receive_frame_as_frame_object()
+ if not frame.fin:
+ raise InvalidFrameException(
+ 'Segmented frames must not be received via '
+ 'receive_filtered_frame()')
+ if (frame.opcode != common.OPCODE_TEXT and
+ frame.opcode != common.OPCODE_BINARY):
+ raise InvalidFrameException(
+ 'Control frames must not be received via '
+ 'receive_filtered_frame()')
+
+ for frame_filter in self._options.incoming_frame_filters:
+ frame_filter.filter(frame)
+ for message_filter in self._options.incoming_message_filters:
+ frame.payload = message_filter.filter(frame.payload)
+ return frame
def send_message(self, message, end=True, binary=False):
"""Send message.
@@ -493,7 +509,35 @@ class Stream(StreamBase):
message = message_filter.filter(message, end, binary)
try:
- self._write(self._writer.build(message, end, binary))
+ # Set this to any positive integer to limit maximum size of data in
+ # payload data of each frame.
+ MAX_PAYLOAD_DATA_SIZE = -1
+
+ if MAX_PAYLOAD_DATA_SIZE <= 0:
+ self._write(self._writer.build(message, end, binary))
+ return
+
+ bytes_written = 0
+ while True:
+ end_for_this_frame = end
+ bytes_to_write = len(message) - bytes_written
+ if (MAX_PAYLOAD_DATA_SIZE > 0 and
+ bytes_to_write > MAX_PAYLOAD_DATA_SIZE):
+ end_for_this_frame = False
+ bytes_to_write = MAX_PAYLOAD_DATA_SIZE
+
+ frame = self._writer.build(
+ message[bytes_written:bytes_written + bytes_to_write],
+ end_for_this_frame,
+ binary)
+ self._write(frame)
+
+ bytes_written += bytes_to_write
+
+ # This if must be placed here (the end of while block) so that
+ # at least one frame is sent.
+ if len(message) <= bytes_written:
+ break
except ValueError, e:
raise BadOperationException(e)
@@ -548,8 +592,7 @@ class Stream(StreamBase):
else:
# Start of fragmentation frame
- if (not self._options.allow_fragmented_control_frame and
- common.is_control_opcode(frame.opcode)):
+ if common.is_control_opcode(frame.opcode):
raise InvalidFrameException(
'Control frames must not be fragmented')
@@ -593,8 +636,9 @@ class Stream(StreamBase):
self._request.ws_close_code,
self._request.ws_close_reason)
- # Drain junk data after the close frame if necessary.
- self._drain_received_data()
+ # As we've received a close frame, no more data is coming over the
+ # socket. We can now safely close the socket without worrying about
+ # RST sending.
if self._request.server_terminated:
self._logger.debug(
@@ -618,7 +662,7 @@ class Stream(StreamBase):
reason = ''
self._send_closing_handshake(code, reason)
self._logger.debug(
- 'Sent ack for client-initiated closing handshake '
+ 'Acknowledged closing handshake initiated by the peer '
'(code=%r, reason=%r)', code, reason)
def _process_ping_message(self, message):
@@ -761,13 +805,15 @@ class Stream(StreamBase):
self._write(frame)
- def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
+ def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason='',
+ wait_response=True):
"""Closes a WebSocket connection.
Args:
code: Status code for close frame. If code is None, a close
frame with empty body will be sent.
reason: string representing close reason.
+ wait_response: True when caller want to wait the response.
Raises:
BadOperationException: when reason is specified with code None
or reason is not an instance of both str and unicode.
@@ -790,11 +836,11 @@ class Stream(StreamBase):
self._send_closing_handshake(code, reason)
self._logger.debug(
- 'Sent server-initiated closing handshake (code=%r, reason=%r)',
+ 'Initiated closing handshake (code=%r, reason=%r)',
code, reason)
if (code == common.STATUS_GOING_AWAY or
- code == common.STATUS_PROTOCOL_ERROR):
+ code == common.STATUS_PROTOCOL_ERROR) or not wait_response:
# It doesn't make sense to wait for a close frame if the reason is
# protocol error or that the server is going away. For some of
# other reasons, it might not make sense to wait for a close frame,
@@ -837,25 +883,5 @@ class Stream(StreamBase):
return self._original_opcode
- def _drain_received_data(self):
- """Drains unread data in the receive buffer to avoid sending out TCP
- RST packet. This is because when deflate-stream is enabled, some
- DEFLATE block for flushing data may follow a close frame. If any data
- remains in the receive buffer of a socket when the socket is closed,
- it sends out TCP RST packet to the other peer.
-
- Since mod_python's mp_conn object doesn't support non-blocking read,
- we perform this only when pywebsocket is running in standalone mode.
- """
-
- # If self._options.deflate_stream is true, self._request is
- # DeflateRequest, so we can get wrapped request object by
- # self._request._request.
- #
- # Only _StandaloneRequest has _drain_received_data method.
- if (self._options.deflate_stream and
- ('_drain_received_data' in dir(self._request._request))):
- self._request._request._drain_received_data()
-
# vi:sts=4 sw=4 et