summaryrefslogtreecommitdiffstats
path: root/mojo/system/channel.h
blob: cea041fd4460c70190756b5de7a9cbd1e8836ebe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef MOJO_SYSTEM_CHANNEL_H_
#define MOJO_SYSTEM_CHANNEL_H_

#include <stdint.h>

#include "base/compiler_specific.h"
#include "base/containers/hash_tables.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/strings/string_piece.h"
#include "base/synchronization/lock.h"
#include "base/threading/thread_checker.h"
#include "mojo/embedder/scoped_platform_handle.h"
#include "mojo/public/c/system/types.h"
#include "mojo/system/message_in_transit.h"
#include "mojo/system/message_pipe.h"
#include "mojo/system/raw_channel.h"
#include "mojo/system/system_impl_export.h"

namespace mojo {

namespace embedder {
class PlatformSupport;
}

namespace system {

// This class is mostly thread-safe. It must be created on an I/O thread.
// |Init()| must be called on that same thread before it becomes thread-safe (in
// particular, before references are given to any other thread) and |Shutdown()|
// must be called on that same thread before destruction. Its public methods are
// otherwise thread-safe. It may be destroyed on any thread, in the sense that
// the last reference to it may be released on any thread, with the proviso that
// |Shutdown()| must have been called first (so the pattern is that a "main"
// reference is kept on its creation thread and is released after |Shutdown()|
// is called, but other threads may have temporarily "dangling" references).
//
// Note that |MessagePipe| calls into |Channel| and the former's |lock_| must be
// acquired before the latter's. When |Channel| wants to call into a
// |MessagePipe|, it must obtain a reference to the |MessagePipe| (from
// |local_id_to_endpoint_info_map_|) under |Channel::lock_| and then release the
// lock.
//
// Also, care must be taken with respect to references: While a |Channel| has
// references to |MessagePipe|s, |MessagePipe|s (via |ProxyMessagePipeEndpoint|)
// may also have references to |Channel|s. These references are set up by
// calling |AttachMessagePipeEndpoint()|. The reference to |MessagePipe| owned
// by |Channel| must be removed by calling |DetachMessagePipeEndpoint()| (which
// is done by |MessagePipe|/|ProxyMessagePipeEndpoint|, which simultaneously
// removes its reference to |Channel|).
class MOJO_SYSTEM_IMPL_EXPORT Channel
    : public base::RefCountedThreadSafe<Channel>,
      public RawChannel::Delegate {
 public:
  // The first message pipe endpoint attached will have this as its local ID.
  static const MessageInTransit::EndpointId kBootstrapEndpointId = 1;

  // |platform_support| (typically owned by |Core|) must remain alive until
  // after |Shutdown()| is called.
  explicit Channel(embedder::PlatformSupport* platform_support);

  // This must be called on the creation thread before any other methods are
  // called, and before references to this object are given to any other
  // threads. |raw_channel| should be uninitialized. Returns true on success. On
  // failure, no other methods should be called (including |Shutdown()|).
  bool Init(scoped_ptr<RawChannel> raw_channel);

  // This must be called on the creation thread before destruction (which can
  // happen on any thread).
  void Shutdown();

  // Signals that |Shutdown()| will be called soon (this may be called from any
  // thread, unlike |Shutdown()|). Warnings will be issued if, e.g., messages
  // are written after this is called; other warnings may be suppressed. (This
  // may be called multiple times, or not at all.)
  void WillShutdownSoon();

  // Attaches the given message pipe/port's endpoint (which must be a
  // |ProxyMessagePipeEndpoint|) to this channel. This assigns it a local ID,
  // which it returns. The first message pipe endpoint attached will always have
  // |kBootstrapEndpointId| as its local ID. (For bootstrapping, this occurs on
  // both sides, so one should use |kBootstrapEndpointId| for the remote ID for
  // the first message pipe across a channel.) Returns |kInvalidEndpointId| on
  // failure.
  // TODO(vtl): Maybe limit the number of attached message pipes.
  MessageInTransit::EndpointId AttachMessagePipeEndpoint(
      scoped_refptr<MessagePipe> message_pipe,
      unsigned port);

  // Runs the message pipe with the given |local_id| (previously attached), with
  // the given |remote_id| (negotiated using some other means, e.g., over an
  // existing message pipe; see comments above for the bootstrap case). Returns
  // false on failure, in particular if no message pipe with |local_id| is
  // attached.
  bool RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
                              MessageInTransit::EndpointId remote_id);

  // Tells the other side of the channel to run a message pipe endpoint (which
  // must already be attached); |local_id| and |remote_id| are relative to this
  // channel (i.e., |local_id| is the other side's remote ID and |remote_id| is
  // its local ID).
  // TODO(vtl): Maybe we should just have a flag argument to
  // |RunMessagePipeEndpoint()| that tells it to do this.
  void RunRemoteMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
                                    MessageInTransit::EndpointId remote_id);

  // This forwards |message| verbatim to |raw_channel_|.
  bool WriteMessage(scoped_ptr<MessageInTransit> message);

  // See |RawChannel::IsWriteBufferEmpty()|.
  // TODO(vtl): Maybe we shouldn't expose this, and instead have a
  // |FlushWriteBufferAndShutdown()| or something like that.
  bool IsWriteBufferEmpty();

  // This removes the message pipe/port's endpoint (with the given local ID and
  // given remote ID, which should be |kInvalidEndpointId| if not yet running),
  // returned by |AttachMessagePipeEndpoint()| from this channel. After this is
  // called, |local_id| may be reused for another message pipe.
  void DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
                                 MessageInTransit::EndpointId remote_id);

  // See |RawChannel::GetSerializedPlatformHandleSize()|.
  size_t GetSerializedPlatformHandleSize() const;

  embedder::PlatformSupport* platform_support() const {
    return platform_support_;
  }

 private:
  // Terminology:
  //   - "Message pipe endpoint": In the implementation, a |MessagePipe| owns
  //     two |MessagePipeEndpoint| objects, one for each port. The
  //     |MessagePipeEndpoint| objects are only accessed via the |MessagePipe|
  //     (which has the lock), with the additional information of the port
  //     number. So as far as the channel is concerned, a message pipe endpoint
  //     is a pointer to a |MessagePipe| together with the port number.
  //       - The value of |port| in |EndpointInfo| refers to the
  //         |ProxyMessagePipeEndpoint| (i.e., the endpoint that is logically on
  //         the other side). Messages received by a channel for a message pipe
  //         are thus written to the *peer* of this port.
  //   - "Attached"/"detached": A message pipe endpoint is attached to a channel
  //     if it has a pointer to it. It must be detached before the channel gives
  //     up its pointer to it in order to break a reference cycle. (This cycle
  //     is needed to allow a channel to be shut down cleanly, without shutting
  //     down everything else first.)
  //   - "Running" (message pipe endpoint): A message pipe endpoint is running
  //     if messages written to it (via some |MessagePipeDispatcher|, to which
  //     some |MojoHandle| is assigned) are being transmitted through the
  //     channel.
  //       - Before a message pipe endpoint is run, it will queue messages.
  //       - When a message pipe endpoint is detached from a channel, it is also
  //         taken out of the running state. After that point, messages should
  //         no longer be written to it.
  //   - "Normal" message pipe endpoint (state): The channel itself does not
  //     have knowledge of whether a message pipe endpoint has started running
  //     yet. It will *receive* messages for a message pipe in either state (but
  //     the message pipe endpoint won't *send* messages to the channel if it
  //     has not started running).
  //   - "Zombie" message pipe endpoint (state): A message pipe endpoint is a
  //     zombie if it is still in |local_id_to_endpoint_info_map_|, but the
  //     channel is no longer forwarding messages to it (even if it may still be
  //     receiving messages for it).
  //       - There are various types of zombies, depending on the reason the
  //         message pipe endpoint cannot yet be removed.
  //       - If the remote side is closed, it will send a "remove" control
  //         message. After the channel receives that message (to which it
  //         responds with a "remove ack" control message), it knows that it
  //         shouldn't receive any more messages for that message pipe endpoint
  //         (local ID), but it must wait for the endpoint to detach. (It can't
  //         do so without a race, since it can't call into the message pipe
  //         under |lock_|.) [TODO(vtl): When I add remotely-allocated IDs,
  //         we'll have to remove the |EndpointInfo| from
  //         |local_id_to_endpoint_info_map_| -- i.e., remove the local ID,
  //         since it's no longer valid and may be reused by the remote side --
  //         and keep the |EndpointInfo| alive in some other way.]
  //       - If the local side is closed and the message pipe endpoint was
  //         already running (so there are no queued messages left to send), it
  //         will detach the endpoint, and send a "remove" control message.
  //         However, the channel may still receive messages for that endpoint
  //         until it receives a "remove ack" control message.
  //       - If the local side is closed but the message pipe endpoint was not
  //         yet running , the detaching is delayed until after it is run and
  //         all the queued messages are sent to the channel. On being detached,
  //         things proceed as in one of the above cases. The endpoint is *not*
  //         a zombie until it is detached (or a "remove" message is received).
  //         [TODO(vtl): Maybe we can get rid of this case? It'd only not yet be
  //         running since under the current scheme it wouldn't have a remote ID
  //         yet.]
  //       - Note that even if the local side is closed, it may still receive a
  //         "remove" message from the other side (if the other side is closed
  //         simultaneously, and both sides send "remove" messages). In that
  //         case, it must still remain alive until it receives the "remove
  //         ack" (and it must ack the "remove" message that it received).
  struct EndpointInfo {
    enum State {
      // Attached, possibly running or not.
      STATE_NORMAL,
      // "Zombie" states:
      // Waiting for |DetachMessagePipeEndpoint()| before removing.
      STATE_WAIT_LOCAL_DETACH,
      // Waiting for a |kSubtypeChannelRemoveMessagePipeEndpointAck| before
      // removing.
      STATE_WAIT_REMOTE_REMOVE_ACK,
    };

    EndpointInfo();
    EndpointInfo(scoped_refptr<MessagePipe> message_pipe, unsigned port);
    ~EndpointInfo();

    State state;
    scoped_refptr<MessagePipe> message_pipe;
    unsigned port;
  };

  friend class base::RefCountedThreadSafe<Channel>;
  virtual ~Channel();

  // |RawChannel::Delegate| implementation:
  virtual void OnReadMessage(
      const MessageInTransit::View& message_view,
      embedder::ScopedPlatformHandleVectorPtr platform_handles) OVERRIDE;
  virtual void OnError(Error error) OVERRIDE;

  // Helpers for |OnReadMessage|:
  void OnReadMessageForDownstream(
      const MessageInTransit::View& message_view,
      embedder::ScopedPlatformHandleVectorPtr platform_handles);
  void OnReadMessageForChannel(
      const MessageInTransit::View& message_view,
      embedder::ScopedPlatformHandleVectorPtr platform_handles);

  // Removes the message pipe endpoint with the given local ID, which must exist
  // and be a zombie, and given remote ID. Returns false on failure, in
  // particular if no message pipe with |local_id| is attached.
  bool RemoveMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
                                 MessageInTransit::EndpointId remote_id);

  // Handles errors (e.g., invalid messages) from the remote side.
  void HandleRemoteError(const base::StringPiece& error_message);
  // Handles internal errors/failures from the local side.
  void HandleLocalError(const base::StringPiece& error_message);

  // Helper to send channel control messages. Returns true on success. Should be
  // called *without* |lock_| held.
  bool SendControlMessage(MessageInTransit::Subtype subtype,
                          MessageInTransit::EndpointId source_id,
                          MessageInTransit::EndpointId destination_id);

  base::ThreadChecker creation_thread_checker_;

  embedder::PlatformSupport* const platform_support_;

  // Note: |MessagePipe|s MUST NOT be used under |lock_|. I.e., |lock_| can only
  // be acquired after |MessagePipe::lock_|, never before. Thus to call into a
  // |MessagePipe|, a reference should be acquired from
  // |local_id_to_endpoint_info_map_| under |lock_| (e.g., by copying the
  // |EndpointInfo|) and then the lock released.
  base::Lock lock_;  // Protects the members below.

  scoped_ptr<RawChannel> raw_channel_;
  bool is_running_;
  // Set when |WillShutdownSoon()| is called.
  bool is_shutting_down_;

  typedef base::hash_map<MessageInTransit::EndpointId, EndpointInfo>
      IdToEndpointInfoMap;
  IdToEndpointInfoMap local_id_to_endpoint_info_map_;
  // The next local ID to try (when allocating new local IDs). Note: It should
  // be checked for existence before use.
  MessageInTransit::EndpointId next_local_id_;

  DISALLOW_COPY_AND_ASSIGN(Channel);
};

}  // namespace system
}  // namespace mojo

#endif  // MOJO_SYSTEM_CHANNEL_H_