summaryrefslogtreecommitdiffstats
path: root/chromium/content/browser/byte_stream.h
blob: 09ba51c213d1ddb86bf1146dbd2ed5799c275d95 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef CONTENT_BROWSER_BYTE_STREAM_H_
#define CONTENT_BROWSER_BYTE_STREAM_H_

#include <stddef.h>

#include <memory>

#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "base/task/post_task.h"
#include "content/common/content_export.h"
#include "content/public/browser/browser_task_traits.h"
#include "content/public/browser/browser_thread.h"
#include "net/base/io_buffer.h"

namespace base {
class SequencedTaskRunner;
}

namespace content {

// A byte stream is a pipe to transfer bytes between a source and a
// sink, which may be on different threads.  It is intended to be the
// only connection between source and sink; they need have no
// direct awareness of each other aside from the byte stream.  The source and
// the sink have different interfaces to a byte stream, |ByteStreamWriter|
// and |ByteStreamReader|.  A pair of connected interfaces is generated by
// calling |CreateByteStream|.
//
// The source adds bytes to the bytestream via |ByteStreamWriter::Write|
// and the sink retrieves bytes already written via |ByteStreamReader::Read|.
//
// When the source has no more data to add, it will call
// |ByteStreamWriter::Close| to indicate that.  Operation status at the source
// is indicated to the sink via an int passed to the Close() method and returned
// from the GetStatus() method. Source and sink must agree on the interpretation
// of this int.
//
// Normally the source is not managed after the relationship is setup;
// it is expected to provide data and then close itself.  If an error
// occurs on the sink, it is not signalled to the source via this
// mechanism; instead, the source will write data until it exausts the
// available space.  If the source needs to be aware of errors occuring
// on the sink, this must be signalled in some other fashion (usually
// through whatever controller setup the relationship).
//
// Callback lifetime management: No lifetime management is done in this
// class to prevent registered callbacks from being called after any
// objects to which they may refer have been destroyed.  It is the
// responsibility of the callers to avoid use-after-free references.
// This may be done by any of several mechanisms, including weak
// pointers, scoped_refptr references, or calling the registration
// function with a null callback from a destructor.  To enable the null
// callback strategy, callbacks will not be stored between retrieval and
// evaluation, so setting a null callback will guarantee that the
// previous callback will not be executed after setting.
//
// Class methods are virtual to allow mocking for tests; these classes
// aren't intended to be base classes for other classes.
//
// Sample usage (note that this does not show callback usage):
//
//    void OriginatingClass::Initialize() {
//      // Create a stream for sending bytes from IO->FILE threads.
//      std::unique_ptr<ByteStreamWriter> writer;
//      std::unique_ptr<ByteStreamReader> reader;
//      CreateByteStream(
//          GetIOThreadTaskRunner({}),
//          base::CreateSequencedTaskRunner({base::MayBlock, ...}),
//          kStreamBufferSize /* e.g. 10240.  */,
//          &writer,
//          &reader);         // Presumed passed to FILE thread for reading.
//
//      // Setup callback for writing.
//      writer->RegisterCallback(base::BindRepeating(&SpaceAvailable, this));
//
//      // Do initial round of writing.
//      SpaceAvailable();
//    }
//
//    // May only be run on first argument task runner, in this case the IO
//    // thread.
//    void OriginatingClass::SpaceAvailable() {
//      while (<data available>) {
//        std::unique_ptr<net::IOBuffer> buffer;
//        size_t buffer_length;
//        // Create IOBuffer, fill in with data, and set buffer_length.
//        if (!writer->Write(buffer, buffer_length)) {
//          // No more space; return and we'll be called again
//          // when there is space.
//          return;
//        }
//      }
//      writer->Close(<operation status>);
//      writer.reset(NULL);
//    }
//
//    // On File thread; containing class setup not shown.
//
//    void ReceivingClass::Initialize() {
//      // Initialization
//      reader->RegisterCallback(base::BindRepeating(&DataAvailable, obj));
//    }
//
//    // Called whenever there's something to read.
//    void ReceivingClass::DataAvailable() {
//      scoped_refptr<net::IOBuffer> data;
//      size_t length = 0;
//
//      while (ByteStreamReader::STREAM_HAS_DATA ==
//             (state = reader->Read(&data, &length))) {
//        // Process |data|.
//      }
//
//      if (ByteStreamReader::STREAM_COMPLETE == state) {
//        int status = reader->GetStatus();
//        // Process error or successful completion in |status|.
//      }
//
//      // if |state| is STREAM_EMPTY, we're done for now; we'll be called
//      // again when there's more data.
//    }
class CONTENT_EXPORT ByteStreamWriter {
 public:
  // Inverse of the fraction of the stream buffer that must be full before
  // a notification is sent to paired Reader that there's more data.
  static const int kFractionBufferBeforeSending;

  virtual ~ByteStreamWriter() = 0;

  // Always adds the data passed into the ByteStream.  Returns true
  // if more data may be added without exceeding the class limit
  // on data.  Takes ownership of |buffer|.
  virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
                     size_t byte_count) = 0;

  // Flushes contents buffered in this writer to the corresponding reader
  // regardless if buffer filling rate is greater than
  // kFractionBufferBeforeSending or not. Does nothing if there's no contents
  // buffered.
  virtual void Flush() = 0;

  // Signal that all data that is going to be sent, has been sent,
  // and provide a status.
  virtual void Close(int status) = 0;

  // Register a callback to be called when the stream transitions from
  // full to having space available.  The callback will always be
  // called on the task runner associated with the ByteStreamWriter.
  // This callback will only be called if a call to Write has previously
  // returned false (i.e. the ByteStream has been filled).
  // Multiple calls to this function are supported, though note that it
  // is the callers responsibility to handle races with space becoming
  // available (i.e. in the case of that race either of the before
  // or after callbacks may be called).
  // The callback will not be called after ByteStreamWriter destruction.
  virtual void RegisterCallback(base::RepeatingClosure source_callback) = 0;

  // Returns the number of bytes sent to the reader but not yet reported by
  // the reader as read.
  virtual size_t GetTotalBufferedBytes() const = 0;
};

class CONTENT_EXPORT ByteStreamReader {
 public:
  // Inverse of the fraction of the stream buffer that must be empty before
  // a notification is send to paired Writer that there's more room.
  static const int kFractionReadBeforeWindowUpdate;

  enum StreamState { STREAM_EMPTY, STREAM_HAS_DATA, STREAM_COMPLETE };

  virtual ~ByteStreamReader() = 0;

  // Returns STREAM_EMPTY if there is no data on the ByteStream and
  // Close() has not been called, and STREAM_COMPLETE if there
  // is no data on the ByteStream and Close() has been called.
  // If there is data on the ByteStream, returns STREAM_HAS_DATA
  // and fills in |*data| with a pointer to the data, and |*length|
  // with its length.
  virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
                           size_t* length) = 0;

  // Only valid to call if Read() has returned STREAM_COMPLETE.
  virtual int GetStatus() const = 0;

  // Register a callback to be called when data is added or the source
  // completes.  The callback will be always be called on the owning
  // task runner.  Multiple calls to this function are supported,
  // though note that it is the callers responsibility to handle races
  // with data becoming available (i.e. in the case of that race
  // either of the before or after callbacks may be called).
  // The callback will not be called after ByteStreamReader destruction.
  virtual void RegisterCallback(base::RepeatingClosure sink_callback) = 0;
};

CONTENT_EXPORT void CreateByteStream(
    scoped_refptr<base::SequencedTaskRunner> input_task_runner,
    scoped_refptr<base::SequencedTaskRunner> output_task_runner,
    size_t buffer_size,
    std::unique_ptr<ByteStreamWriter>* input,
    std::unique_ptr<ByteStreamReader>* output);

}  // namespace content

#endif  // CONTENT_BROWSER_BYTE_STREAM_H_