summaryrefslogtreecommitdiffstats
path: root/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecutorReactor.java
blob: 9d741d8a7b9aaf450f9ab7bcac66d7733cc4622f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
 * ExecutorReactor.java February 2007
 *
 * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
 * implied. See the License for the specific language governing 
 * permissions and limitations under the License.
 */

package org.simpleframework.transport.reactor;

import java.io.IOException;
import java.util.concurrent.Executor;

/**
 * The <code>ExecutorReactor</code> is used to schedule operation for
 * execution using an <code>Executor</code> implementation. This can be
 * useful when the operations performed are time intensive. For example
 * if the operations performed a read of the underlying channel and 
 * then had to parse the contents of the payload. Such operations would
 * reduce the performance of the reactor if it could not delegate to
 * some other form of executor, as it would delay their execution.
 *
 * @author Niall Gallagher
 */
public class ExecutorReactor implements Reactor {

  /**
   * This is used to distribute the ready operations for execution.
   */         
  private final OperationDistributor exchange;

  /**
   * This is used to execute the operations that ready to run.
   */ 
  private final Executor executor;
  
  /**
   * Constructor for the <code>ExecutorReactor</code> object. This is
   * used to create a reactor that can delegate to the executor. This
   * also accepts the operations it is interested in, the value is
   * taken from the <code>SelectionKey</code> object. A bit mask can
   * be used to show interest in several operations at once.
   *
   * @param executor this is the executor used to run the operations
   */  
  public ExecutorReactor(Executor executor) throws IOException {
     this(executor, 1);
  }
  
  /**
   * Constructor for the <code>ExecutorReactor</code> object. This is
   * used to create a reactor that can delegate to the executor. This
   * also accepts the operations it is interested in, the value is
   * taken from the <code>SelectionKey</code> object. A bit mask can
   * be used to show interest in several operations at once.
   *
   * @param executor this is the executor used to run the operations
   * @param count this is the number of distributors to be used
   */  
  public ExecutorReactor(Executor executor, int count) throws IOException {    
     this(executor, count, 120000);
  }  

  /**
   * Constructor for the <code>ExecutorReactor</code> object. This is
   * used to create a reactor that can delegate to the executor. This
   * also accepts the operations it is interested in, the value is
   * taken from the <code>SelectionKey</code> object. A bit mask can
   * be used to show interest in several operations at once.
   *
   * @param executor this is the executor used to run the operations
   * @param count this is the number of distributors to be used
   * @param expiry the length of time to maintain and idle operation
   */    
  public ExecutorReactor(Executor executor, int count, long expiry) throws IOException {    
    this.exchange = new PartitionDistributor(executor, count, expiry);    
    this.executor = executor;
  }

  /**
   * This method is used to execute the provided operation without
   * the need to specifically check for I/O events. This is used if
   * the operation knows that the <code>SelectableChannel</code> is
   * ready, or if the I/O operation can be performed without knowing
   * if the channel is ready. Typically this is an efficient means
   * to perform a poll rather than a select on the channel.
   *
   * @param task this is the task to execute immediately
   */   
  public void process(Operation task) throws IOException {
     executor.execute(task);          
  }
  
  /**        
   * This method is used to execute the provided operation when there
   * is an I/O event that task is interested in. This will used the
   * operations <code>SelectableChannel</code> object to determine 
   * the events that are ready on the channel. If this reactor is
   * interested in any of the ready events then the task is executed.
   *
   * @param task this is the task to execute on interested events
   * @param require this is the bit-mask value for interested events
   */  
  public void process(Operation task, int require) throws IOException {         
     exchange.process(task, require);    
  }        

  /**
   * This is used to stop the reactor so that further requests to
   * execute operations does nothing. This will clean up all of 
   * the reactors resources and unregister any operations that are
   * currently awaiting execution. This should be used to ensure
   * any threads used by the reactor gracefully stop.
   */   
  public void stop() throws IOException {
     exchange.close();
  }
}