// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

part of core;

class MojoEventStream extends Stream<int> {
  // The underlying Mojo handle.
  MojoHandle _handle;

  // Providing our own stream controller allows us to take custom actions when
  // listeners pause/resume/etc. their StreamSubscription.
  StreamController _controller;

  // The send port that we give to the handle watcher to notify us of handle
  // events.
  SendPort _sendPort;

  // The receive port on which we listen and receive events from the handle 
  // watcher.
  ReceivePort _receivePort;

  // The signals on this handle that we're interested in.
  MojoHandleSignals _signals;

  // Whether listen has been called.
  bool _isListening;

  MojoEventStream(MojoHandle handle,
                  [MojoHandleSignals signals = MojoHandleSignals.READABLE]) :
      _handle = handle,
      _signals = signals,
      _isListening = false {
    MojoResult result = MojoHandle.register(this);
    if (!result.isOk) {
      throw "Failed to register the MojoHandle: $result.";
    }
  }

  void close() {
    if (_handle != null) {
      MojoHandleWatcher.close(_handle);
      _handle = null;
    }
    if (_receivePort != null) {
      _receivePort.close();
      _receivePort = null;
    }
  }

  StreamSubscription<List<int>> listen(
      void onData(List event),
      {Function onError, void onDone(), bool cancelOnError}) {
    if (_isListening) {
      throw "Listen has already been called: $_handle.";
    }
    _receivePort = new ReceivePort();
    _sendPort = _receivePort.sendPort;
    _controller = new StreamController(sync: true,
        onListen: _onSubscriptionStateChange,
        onCancel: _onSubscriptionStateChange,
        onPause: _onPauseStateChange,
        onResume: _onPauseStateChange);
    _controller.addStream(_receivePort);

    if (_signals != MojoHandleSignals.NONE) {
      var res = MojoHandleWatcher.add(_handle, _sendPort, _signals.value);
      if (!res.isOk) {
        throw "MojoHandleWatcher add failed: $res";
      }
    }

    _isListening = true;
    return _controller.stream.listen(
        onData,
        onError: onError,
        onDone: onDone,
        cancelOnError: cancelOnError);
  }

  void enableSignals(MojoHandleSignals signals) {
    _signals = signals;
    if (_isListening) {
      var res = MojoHandleWatcher.add(_handle, _sendPort, signals.value);
      if (!res.isOk) {
        throw "MojoHandleWatcher add failed: $res";
      }
    }
  }

  void enableReadEvents() => enableSignals(MojoHandleSignals.READABLE);
  void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE);
  void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE);

  void _onSubscriptionStateChange() {
    if (!_controller.hasListener) {
      close();
    }
  }

  void _onPauseStateChange() {
    if (_controller.isPaused) {
      var res = MojoHandleWatcher.remove(_handle);
      if (!res.isOk) {
        throw "MojoHandleWatcher add failed: $res";
      }
    } else {
      var res = MojoHandleWatcher.add(_handle, _sendPort, _signals.value);
      if (!res.isOk) {
        throw "MojoHandleWatcher add failed: $res";
      }
    }
  }

  bool get readyRead => _handle.readyRead;
  bool get readyWrite => _handle.readyWrite;

  String toString() => "$_handle";
}


class MojoEventStreamListener {
  MojoMessagePipeEndpoint _endpoint;
  MojoEventStream _eventStream;
  bool _isOpen = false;
  bool _isInHandler = false;

  MojoEventStreamListener(MojoMessagePipeEndpoint endpoint) :
      _endpoint = endpoint,
      _eventStream = new MojoEventStream(endpoint.handle),
      _isOpen = false;

  MojoEventStreamListener.fromHandle(MojoHandle handle) {
    _endpoint = new MojoMessagePipeEndpoint(handle);
    _eventStream = new MojoEventStream(handle);
    _isOpen = false;
  }

  MojoEventStreamListener.unbound() :
      _endpoint = null,
      _eventStream = null,
      _isOpen = false;

  void bind(MojoMessagePipeEndpoint endpoint) {
    assert(!isBound);
    _endpoint = endpoint;
    _eventStream = new MojoEventStream(endpoint.handle);
    _isOpen = false;
  }

  void bindFromHandle(MojoHandle handle) {
    assert(!isBound);
    _endpoint = new MojoMessagePipeEndpoint(handle);
    _eventStream = new MojoEventStream(handle);
    _isOpen = false;
  }

  StreamSubscription<int> listen() {
    _isOpen = true;
    return _eventStream.listen((List<int> event) {
      var signalsWatched = new MojoHandleSignals(event[0]);
      var signalsReceived = new MojoHandleSignals(event[1]);
      if (signalsReceived.isPeerClosed) {
        handlePeerClosed();
        // The peer being closed obviates any other signal we might
        // have received since we won't be able to read or write the handle.
        // Thus, we just return before invoking other handlers.
        return;
      }
      _isInHandler = true;
      if (signalsReceived.isReadable) {
        assert(_eventStream.readyRead);
        handleRead();
      }
      if (signalsReceived.isWritable) {
        assert(_eventStream.readyWrite);
        handleWrite();
      }
      _eventStream.enableSignals(enableSignals(
          signalsWatched, signalsReceived));
      _isInHandler = false;
    });
  }

  void close() {
    if (_isOpen) {
      _eventStream.close();
      _isOpen = false;
      _eventStream = null;
      _endpoint = null;
    }
  }

  void handleRead() {}
  void handleWrite() {}
  void handlePeerClosed() {
    close();
  }

  MojoHandleSignals enableSignals(MojoHandleSignals watched,
                                  MojoHandleSignals received) => watched;

  MojoMessagePipeEndpoint get endpoint => _endpoint;
  bool get isOpen => _isOpen;
  bool get isInHandler => _isInHandler;
  bool get isBound => _endpoint != null;
}