448 lines
16 KiB
Java
448 lines
16 KiB
Java
/*
|
|
* Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
|
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
|
*
|
|
* This code is free software; you can redistribute it and/or modify it
|
|
* under the terms of the GNU General Public License version 2 only, as
|
|
* published by the Free Software Foundation. Oracle designates this
|
|
* particular file as subject to the "Classpath" exception as provided
|
|
* by Oracle in the LICENSE file that accompanied this code.
|
|
*
|
|
* This code is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
* version 2 for more details (a copy is included in the LICENSE file that
|
|
* accompanied this code).
|
|
*
|
|
* You should have received a copy of the GNU General Public License version
|
|
* 2 along with this work; if not, write to the Free Software Foundation,
|
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
*
|
|
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
|
* or visit www.oracle.com if you need additional information or have any
|
|
* questions.
|
|
*/
|
|
package sun.rmi.transport.tcp;
|
|
|
|
import java.io.*;
|
|
import java.util.*;
|
|
import java.rmi.server.LogStream;
|
|
|
|
import sun.rmi.runtime.Log;
|
|
|
|
/**
|
|
* ConnectionMultiplexer manages the transparent multiplexing of
|
|
* multiple virtual connections from one endpoint to another through
|
|
* one given real connection to that endpoint. The input and output
|
|
* streams for the the underlying real connection must be supplied.
|
|
* A callback object is also supplied to be informed of new virtual
|
|
* connections opened by the remote endpoint. After creation, the
|
|
* run() method must be called in a thread created for demultiplexing
|
|
* the connections. The openConnection() method is called to
|
|
* initiate a virtual connection from this endpoint.
|
|
*
|
|
* @author Peter Jones
|
|
*/
|
|
@SuppressWarnings("deprecation")
|
|
final class ConnectionMultiplexer {
|
|
|
|
/** "multiplex" log level */
|
|
static int logLevel = LogStream.parseLevel(getLogLevel());
|
|
|
|
private static String getLogLevel() {
|
|
return java.security.AccessController.doPrivileged(
|
|
new sun.security.action.GetPropertyAction("sun.rmi.transport.tcp.multiplex.logLevel"));
|
|
}
|
|
|
|
/* multiplex system log */
|
|
static final Log multiplexLog =
|
|
Log.getLog("sun.rmi.transport.tcp.multiplex",
|
|
"multiplex", ConnectionMultiplexer.logLevel);
|
|
|
|
/** multiplexing protocol operation codes */
|
|
private final static int OPEN = 0xE1;
|
|
private final static int CLOSE = 0xE2;
|
|
private final static int CLOSEACK = 0xE3;
|
|
private final static int REQUEST = 0xE4;
|
|
private final static int TRANSMIT = 0xE5;
|
|
|
|
/** object to notify for new connections from remote endpoint */
|
|
private TCPChannel channel;
|
|
|
|
/** input stream for underlying single connection */
|
|
private InputStream in;
|
|
|
|
/** output stream for underlying single connection */
|
|
private OutputStream out;
|
|
|
|
/** true if underlying connection originated from this endpoint
|
|
(used for generating unique connection IDs) */
|
|
private boolean orig;
|
|
|
|
/** layered stream for reading formatted data from underlying connection */
|
|
private DataInputStream dataIn;
|
|
|
|
/** layered stream for writing formatted data to underlying connection */
|
|
private DataOutputStream dataOut;
|
|
|
|
/** table holding currently open connection IDs and related info */
|
|
private Hashtable<Integer, MultiplexConnectionInfo> connectionTable = new Hashtable<>(7);
|
|
|
|
/** number of currently open connections */
|
|
private int numConnections = 0;
|
|
|
|
/** maximum allowed open connections */
|
|
private final static int maxConnections = 256;
|
|
|
|
/** ID of last connection opened */
|
|
private int lastID = 0x1001;
|
|
|
|
/** true if this mechanism is still alive */
|
|
private boolean alive = true;
|
|
|
|
/**
|
|
* Create a new ConnectionMultiplexer using the given underlying
|
|
* input/output stream pair. The run method must be called
|
|
* (possibly on a new thread) to handle the demultiplexing.
|
|
* @param channel object to notify when new connection is received
|
|
* @param in input stream of underlying connection
|
|
* @param out output stream of underlying connection
|
|
* @param orig true if this endpoint intiated the underlying
|
|
* connection (needs to be set differently at both ends)
|
|
*/
|
|
public ConnectionMultiplexer(
|
|
TCPChannel channel,
|
|
InputStream in,
|
|
OutputStream out,
|
|
boolean orig)
|
|
{
|
|
this.channel = channel;
|
|
this.in = in;
|
|
this.out = out;
|
|
this.orig = orig;
|
|
|
|
dataIn = new DataInputStream(in);
|
|
dataOut = new DataOutputStream(out);
|
|
}
|
|
|
|
/**
|
|
* Process multiplexing protocol received from underlying connection.
|
|
*/
|
|
public void run() throws IOException
|
|
{
|
|
try {
|
|
int op, id, length;
|
|
MultiplexConnectionInfo info;
|
|
|
|
while (true) {
|
|
|
|
// read next op code from remote endpoint
|
|
op = dataIn.readUnsignedByte();
|
|
switch (op) {
|
|
|
|
// remote endpoint initiating new connection
|
|
case OPEN:
|
|
id = dataIn.readUnsignedShort();
|
|
|
|
if (multiplexLog.isLoggable(Log.VERBOSE)) {
|
|
multiplexLog.log(Log.VERBOSE, "operation OPEN " + id);
|
|
}
|
|
|
|
info = connectionTable.get(id);
|
|
if (info != null)
|
|
throw new IOException(
|
|
"OPEN: Connection ID already exists");
|
|
info = new MultiplexConnectionInfo(id);
|
|
info.in = new MultiplexInputStream(this, info, 2048);
|
|
info.out = new MultiplexOutputStream(this, info, 2048);
|
|
synchronized (connectionTable) {
|
|
connectionTable.put(id, info);
|
|
++ numConnections;
|
|
}
|
|
sun.rmi.transport.Connection conn;
|
|
conn = new TCPConnection(channel, info.in, info.out);
|
|
channel.acceptMultiplexConnection(conn);
|
|
break;
|
|
|
|
// remote endpoint closing connection
|
|
case CLOSE:
|
|
id = dataIn.readUnsignedShort();
|
|
|
|
if (multiplexLog.isLoggable(Log.VERBOSE)) {
|
|
multiplexLog.log(Log.VERBOSE, "operation CLOSE " + id);
|
|
}
|
|
|
|
info = connectionTable.get(id);
|
|
if (info == null)
|
|
throw new IOException(
|
|
"CLOSE: Invalid connection ID");
|
|
info.in.disconnect();
|
|
info.out.disconnect();
|
|
if (!info.closed)
|
|
sendCloseAck(info);
|
|
synchronized (connectionTable) {
|
|
connectionTable.remove(id);
|
|
-- numConnections;
|
|
}
|
|
break;
|
|
|
|
// remote endpoint acknowledging close of connection
|
|
case CLOSEACK:
|
|
id = dataIn.readUnsignedShort();
|
|
|
|
if (multiplexLog.isLoggable(Log.VERBOSE)) {
|
|
multiplexLog.log(Log.VERBOSE,
|
|
"operation CLOSEACK " + id);
|
|
}
|
|
|
|
info = connectionTable.get(id);
|
|
if (info == null)
|
|
throw new IOException(
|
|
"CLOSEACK: Invalid connection ID");
|
|
if (!info.closed)
|
|
throw new IOException(
|
|
"CLOSEACK: Connection not closed");
|
|
info.in.disconnect();
|
|
info.out.disconnect();
|
|
synchronized (connectionTable) {
|
|
connectionTable.remove(id);
|
|
-- numConnections;
|
|
}
|
|
break;
|
|
|
|
// remote endpoint declaring additional bytes receivable
|
|
case REQUEST:
|
|
id = dataIn.readUnsignedShort();
|
|
info = connectionTable.get(id);
|
|
if (info == null)
|
|
throw new IOException(
|
|
"REQUEST: Invalid connection ID");
|
|
length = dataIn.readInt();
|
|
|
|
if (multiplexLog.isLoggable(Log.VERBOSE)) {
|
|
multiplexLog.log(Log.VERBOSE,
|
|
"operation REQUEST " + id + ": " + length);
|
|
}
|
|
|
|
info.out.request(length);
|
|
break;
|
|
|
|
// remote endpoint transmitting data packet
|
|
case TRANSMIT:
|
|
id = dataIn.readUnsignedShort();
|
|
info = connectionTable.get(id);
|
|
if (info == null)
|
|
throw new IOException("SEND: Invalid connection ID");
|
|
length = dataIn.readInt();
|
|
|
|
if (multiplexLog.isLoggable(Log.VERBOSE)) {
|
|
multiplexLog.log(Log.VERBOSE,
|
|
"operation TRANSMIT " + id + ": " + length);
|
|
}
|
|
|
|
info.in.receive(length, dataIn);
|
|
break;
|
|
|
|
default:
|
|
throw new IOException("Invalid operation: " +
|
|
Integer.toHexString(op));
|
|
}
|
|
}
|
|
} finally {
|
|
shutDown();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Initiate a new multiplexed connection through the underlying
|
|
* connection.
|
|
*/
|
|
public synchronized TCPConnection openConnection() throws IOException
|
|
{
|
|
// generate ID that should not be already used
|
|
// If all possible 32768 IDs are used,
|
|
// this method will block searching for a new ID forever.
|
|
int id;
|
|
do {
|
|
lastID = (++ lastID) & 0x7FFF;
|
|
id = lastID;
|
|
|
|
// The orig flag (copied to the high bit of the ID) is used
|
|
// to have two distinct ranges to choose IDs from for the
|
|
// two endpoints.
|
|
if (orig)
|
|
id |= 0x8000;
|
|
} while (connectionTable.get(id) != null);
|
|
|
|
// create multiplexing streams and bookkeeping information
|
|
MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
|
|
info.in = new MultiplexInputStream(this, info, 2048);
|
|
info.out = new MultiplexOutputStream(this, info, 2048);
|
|
|
|
// add to connection table if multiplexer has not died
|
|
synchronized (connectionTable) {
|
|
if (!alive)
|
|
throw new IOException("Multiplexer connection dead");
|
|
if (numConnections >= maxConnections)
|
|
throw new IOException("Cannot exceed " + maxConnections +
|
|
" simultaneous multiplexed connections");
|
|
connectionTable.put(id, info);
|
|
++ numConnections;
|
|
}
|
|
|
|
// inform remote endpoint of new connection
|
|
synchronized (dataOut) {
|
|
try {
|
|
dataOut.writeByte(OPEN);
|
|
dataOut.writeShort(id);
|
|
dataOut.flush();
|
|
} catch (IOException e) {
|
|
multiplexLog.log(Log.BRIEF, "exception: ", e);
|
|
|
|
shutDown();
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
return new TCPConnection(channel, info.in, info.out);
|
|
}
|
|
|
|
/**
|
|
* Shut down all connections and clean up.
|
|
*/
|
|
public void shutDown()
|
|
{
|
|
// inform all associated streams
|
|
synchronized (connectionTable) {
|
|
// return if multiplexer already officially dead
|
|
if (!alive)
|
|
return;
|
|
alive = false;
|
|
|
|
Enumeration<MultiplexConnectionInfo> enum_ =
|
|
connectionTable.elements();
|
|
while (enum_.hasMoreElements()) {
|
|
MultiplexConnectionInfo info = enum_.nextElement();
|
|
info.in.disconnect();
|
|
info.out.disconnect();
|
|
}
|
|
connectionTable.clear();
|
|
numConnections = 0;
|
|
}
|
|
|
|
// close underlying connection, if possible (and not already done)
|
|
try {
|
|
in.close();
|
|
} catch (IOException e) {
|
|
}
|
|
try {
|
|
out.close();
|
|
} catch (IOException e) {
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Send request for more data on connection to remote endpoint.
|
|
* @param info connection information structure
|
|
* @param len number of more bytes that can be received
|
|
*/
|
|
void sendRequest(MultiplexConnectionInfo info, int len) throws IOException
|
|
{
|
|
synchronized (dataOut) {
|
|
if (alive && !info.closed)
|
|
try {
|
|
dataOut.writeByte(REQUEST);
|
|
dataOut.writeShort(info.id);
|
|
dataOut.writeInt(len);
|
|
dataOut.flush();
|
|
} catch (IOException e) {
|
|
multiplexLog.log(Log.BRIEF, "exception: ", e);
|
|
|
|
shutDown();
|
|
throw e;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Send packet of requested data on connection to remote endpoint.
|
|
* @param info connection information structure
|
|
* @param buf array containing bytes to send
|
|
* @param off offset of first array index of packet
|
|
* @param len number of bytes in packet to send
|
|
*/
|
|
void sendTransmit(MultiplexConnectionInfo info,
|
|
byte buf[], int off, int len) throws IOException
|
|
{
|
|
synchronized (dataOut) {
|
|
if (alive && !info.closed)
|
|
try {
|
|
dataOut.writeByte(TRANSMIT);
|
|
dataOut.writeShort(info.id);
|
|
dataOut.writeInt(len);
|
|
dataOut.write(buf, off, len);
|
|
dataOut.flush();
|
|
} catch (IOException e) {
|
|
multiplexLog.log(Log.BRIEF, "exception: ", e);
|
|
|
|
shutDown();
|
|
throw e;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Inform remote endpoint that connection has been closed.
|
|
* @param info connection information structure
|
|
*/
|
|
void sendClose(MultiplexConnectionInfo info) throws IOException
|
|
{
|
|
info.out.disconnect();
|
|
synchronized (dataOut) {
|
|
if (alive && !info.closed)
|
|
try {
|
|
dataOut.writeByte(CLOSE);
|
|
dataOut.writeShort(info.id);
|
|
dataOut.flush();
|
|
info.closed = true;
|
|
} catch (IOException e) {
|
|
multiplexLog.log(Log.BRIEF, "exception: ", e);
|
|
|
|
shutDown();
|
|
throw e;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Acknowledge remote endpoint's closing of connection.
|
|
* @param info connection information structure
|
|
*/
|
|
void sendCloseAck(MultiplexConnectionInfo info) throws IOException
|
|
{
|
|
synchronized (dataOut) {
|
|
if (alive && !info.closed)
|
|
try {
|
|
dataOut.writeByte(CLOSEACK);
|
|
dataOut.writeShort(info.id);
|
|
dataOut.flush();
|
|
info.closed = true;
|
|
} catch (IOException e) {
|
|
multiplexLog.log(Log.BRIEF, "exception: ", e);
|
|
|
|
shutDown();
|
|
throw e;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Shut down connection upon finalization.
|
|
*/
|
|
protected void finalize() throws Throwable
|
|
{
|
|
super.finalize();
|
|
shutDown();
|
|
}
|
|
}
|