View Javadoc

1   /*
2    * EL4J, the Extension Library for the J2EE, adds incremental enhancements to
3    * the spring framework, http://el4j.sf.net
4    * Copyright (C) 2006 by ELCA Informatique SA, Av. de la Harpe 22-24,
5    * 1000 Lausanne, Switzerland, http://www.elca.ch
6    *
7    * EL4J is published under the GNU Lesser General Public License (LGPL)
8    * Version 2.1. See http://www.gnu.org/licenses/
9    *
10   * This program is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13   * GNU Lesser General Public License for more details.
14   *
15   * For alternative licensing, please contact info@elca.ch
16   */
17  package ch.elca.el4j.services.tcpforwarder;
18  
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.io.OutputStream;
22  import java.net.Socket;
23  
24  import org.slf4j.Logger;
25  import org.slf4j.LoggerFactory;
26  import org.springframework.util.Assert;
27  
28  /**
29   * Thread to forward got input to the output.
30   *
31   * @svnLink $Revision: 3884 $;$Date: 2009-08-04 15:48:31 +0200 (Di, 04. Aug 2009) $;$Author: swismer $;$URL: https://el4j.svn.sourceforge.net/svnroot/el4j/branches/el4j_3_1/el4j/framework/modules/tcp_forwarder/src/main/java/ch/elca/el4j/services/tcpforwarder/UnidirectionalForwarderThread.java $
32   *
33   * @author Martin Zeltner (MZE)
34   */
35  public class UnidirectionalForwarderThread extends Thread {
36  	/**
37  	 * Is the forwarder buffer size.
38  	 */
39  	public static final int BUFFER_SIZE = 4096;
40  	
41  	/**
42  	 * Private logger.
43  	 */
44  	private static Logger s_logger
45  		= LoggerFactory.getLogger(UnidirectionalForwarderThread.class);
46  	
47  	/**
48  	 * Is the link <code>this</code> belongs to.
49  	 */
50  	protected final Link m_link;
51  	
52  	/**
53  	 * Is the input source.
54  	 */
55  	protected final Socket m_inputSocket;
56  	
57  	/**
58  	 * Is the output drain.
59  	 */
60  	protected final Socket m_outputSocket;
61  	
62  	/**
63  	 * Flag to mark if the forwarding is done.
64  	 */
65  	protected volatile boolean m_done = false;
66  	
67  	/**
68  	 * Is the input stream of the input socket.
69  	 */
70  	private InputStream m_in;
71  	
72  	/**
73  	 * Is the output stream of the output socket.
74  	 */
75  	private OutputStream m_out;
76  
77  	/**
78  	 * Constructor.
79  	 *
80  	 * @param link Is the link <code>this</code> belongs to.
81  	 * @param inputSocket Is the input source.
82  	 * @param outputSocket Is the output drain.
83  	 */
84  	public UnidirectionalForwarderThread(Link link, Socket inputSocket,
85  		Socket outputSocket) {
86  		Assert.notNull(link);
87  		Assert.notNull(inputSocket);
88  		Assert.notNull(outputSocket);
89  		
90  		m_link = link;
91  		m_inputSocket = inputSocket;
92  		m_outputSocket = outputSocket;
93  	}
94  
95  	/**
96  	 * @return Return <code>true</code> if forwarding is done.
97  	 */
98  	public boolean isDone() {
99  		return m_done;
100 	}
101 
102 	/**
103 	 * Forwards data from input to output.
104 	 *
105 	 * {@inheritDoc}
106 	 */
107 	public void run() {
108 		m_in = null;
109 		m_out = null;
110 		try {
111 			m_in = m_inputSocket.getInputStream();
112 			m_out = m_outputSocket.getOutputStream();
113 
114 			byte[] buffer = new byte[BUFFER_SIZE];
115 			int readBytes = 0;
116 
117 			while (!m_done) {
118 				while (readBytes == 0) {
119 					try {
120 						readBytes
121 							= m_in.read(buffer, 0, BUFFER_SIZE);
122 					} catch (IOException e) {
123 						if (m_done) {
124 							return;
125 						}
126 						break;
127 					}
128 				}
129 				
130 				if (readBytes < 0) {
131 					m_done = true;
132 				} else if (readBytes > 0) {
133 					try {
134 						m_out.write(buffer, 0, readBytes);
135 					} catch (IOException e) {
136 						if (m_done) {
137 							return;
138 						}
139 					}
140 					readBytes = 0;
141 				}
142 			}
143 		} catch (Throwable t) {
144 			s_logger.error("There was a problem while forwarding data. "
145 				+ "Forwarder thread will now end smoothly.", t);
146 		} finally {
147 			smoothHalt();
148 			m_link.done();
149 		}
150 	}
151 
152 	/**
153 	 * Smmothly stops the forwarder thread.
154 	 */
155 	public void smoothHalt() {
156 		m_done = true;
157 		try {
158 			if (m_in != null) {
159 				if (m_inputSocket != null) {
160 					m_inputSocket.shutdownInput();
161 				} else {
162 					m_in.close();
163 				}
164 				m_in = null;
165 			}
166 		} catch (Exception e) {
167 			s_logger.debug(
168 				"Exception while smoothly closing the input side.", e);
169 		}
170 		try {
171 			if (m_out != null) {
172 				m_out.flush();
173 				if (null != m_outputSocket) {
174 					m_outputSocket.shutdownOutput();
175 				} else {
176 					m_out.close();
177 				}
178 				m_out = null;
179 			}
180 		} catch (Exception e) {
181 			s_logger.debug(
182 				"Exception while smoothly closing the output side.", e);
183 		}
184 	}
185 	
186 	/**
187 	 * Immediately stops the forwarder thread.
188 	 */
189 	public void halt() {
190 		m_done = true;
191 		if (m_inputSocket != null) {
192 			try {
193 				m_inputSocket.close();
194 			} catch (IOException e) {
195 				s_logger.debug("Exception while closing input socket.", e);
196 			}
197 		}
198 		if (m_outputSocket != null) {
199 			try {
200 				m_outputSocket.close();
201 			} catch (IOException e) {
202 				s_logger.debug("Exception while closing output socket.", e);
203 			}
204 		}
205 		if (m_in != null) {
206 			try {
207 				m_in.close();
208 			} catch (IOException e) {
209 				s_logger.debug("Exception while closing input stream.", e);
210 			}
211 		}
212 		if (m_out != null) {
213 			try {
214 				m_out.close();
215 			} catch (IOException e) {
216 				s_logger.debug("Exception while closing output stream.", e);
217 			}
218 		}
219 	}
220 }