Preface
This article will introduce the development of NIO, and it will be the basic knowledge of Reactor-Netty
. We have released our videos for Methodology of Java Programming for RxJava
and Reactor Java
.
PS : Chinese version of this series:
Methodology of Java Programming(RxJava)
Youtube:
Bilibili:
Methodology of Java Programming(Reactor Java)
Youtube:
Bilibili:
All versions are based on JDK 11.
Introduction
Followed by the last article From BIO to NIO —— NIO source code interpretation 1
.
Make Channel Implement socket ability
At first, our aim is to enhance or enrich the function of Socket
. Thus, based on this, we need to create environment to meet the requirement to make Channel
own the ability of Socket
. So we define a interface java.nio.channels.NetworkChannel
. Just check the following:
public interface NetworkChannel extends Channel{ NetworkChannel bind(SocketAddress local) throws IOException; SocketAddress getLocalAddress() throws IOException;NetworkChannel setOption(SocketOption name, T value) throws IOException; T getOption(SocketOption name) throws IOException; Set > supportedOptions();}复制代码
By using bind(SocketAddress)
, we bind Socket
to SocketAddress
, then we can get the bind address with the related socket
by getLocalAddress()
. We can define our own config for socket
by setOption(SocketOption,Object)
and getOption(SocketOption)
.
bind() and accept()
Then we turn back to focus on java.nio.channels.ServerSocketChannel
and sun.nio.ch.ServerSocketChannelImpl
. Firstly, just check the implementation of bind
:
//sun.nio.ch.ServerSocketChannelImpl#bind@Overridepublic ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException { synchronized (stateLock) { ensureOpen(); //Using localAddress to check whether bind has been used if (localAddress != null) throw new AlreadyBoundException(); //InetSocketAddress(0) means all addresses which have been bind to local, system will choose suitable sockets InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) : Net.checkAddress(local); SecurityManager sm = System.getSecurityManager(); if (sm != null) sm.checkListen(isa.getPort()); NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort()); Net.bind(fd, isa.getAddress(), isa.getPort()); //Listener started, if backlog in s is smaller than 1. it will default accept 50 connections. Net.listen(fd, backlog < 1 ? 50 : backlog); localAddress = Net.localAddress(fd); } return this;}复制代码
Then we can see how bind and listen implementation in Net.
Net.bind
//sun.nio.ch.Net#bind(java.io.FileDescriptor, java.net.InetAddress, int)public static void bind(FileDescriptor fd, InetAddress addr, int port) throws IOException { bind(UNSPEC, fd, addr, port); }static void bind(ProtocolFamily family, FileDescriptor fd, InetAddress addr, int port) throws IOException{ //if protocol isn't ipv4 and it support ipv6, it will use ipv6. boolean preferIPv6 = isIPv6Available() && (family != StandardProtocolFamily.INET); bind0(fd, preferIPv6, exclusiveBind, addr, port);}private static native void bind0(FileDescriptor fd, boolean preferIPv6, boolean useExclBind, InetAddress addr, int port) throws IOException;复制代码
bind0
is implementation of native method.
JNIEXPORT void JNICALLJava_sun_nio_ch_Net_bind0(JNIEnv *env, jclass clazz, jobject fdo, jboolean preferIPv6, jboolean useExclBind, jobject iao, int port){ SOCKETADDRESS sa; int sa_len = 0; int rv = 0; //将java的InetAddress转换为c的struct sockaddr if (NET_InetAddressToSockaddr(env, iao, port, &sa, &sa_len, preferIPv6) != 0) { return;//convert fail, return } //Using bind:int bind(int sockfd, struct sockaddr* addr, socklen_t addrlen) rv = NET_Bind(fdval(env, fdo), &sa, sa_len); if (rv != 0) { handleSocketError(env, errno); }}复制代码
Since socket
is the pivot between program and core for communication, it doesn't have any network protocol, port and others. In order to solve this, we need to bind socket
to an address.
In general, kernel will help us bind an address atomically. Also, it needs users to do this work by themselves to meet the requirement.
The most typical case is a server need to bind a public address to wait for connections from clients. But for clients, they don't need to invoke bind()
, this will be done by kernel atomically.
But one thing is noticeable, bind is only for bind, when we received a new connection, it will create a new socket
, then server will operate this new Socket
, so we just need to focus on accept
. At sun.nio.ch.ServerSocketChannelImpl#bind
, we can it will use Net.listen(fd, backlog < 1 ? 50 : backlog)
to start listen. If backlog
< 1, it will default receive 50 connections. Now we can check it.
Net.listen
//sun.nio.ch.Net#listenstatic native void listen(FileDescriptor fd, int backlog) throws IOException;复制代码
Now, Net.listen
is native
method.
JNIEXPORT void JNICALLJava_sun_nio_ch_Net_listen(JNIEnv *env, jclass cl, jobject fdo, jint backlog){ if (listen(fdval(env, fdo), backlog) < 0) handleSocketError(env, errno);}复制代码
From the code, we can see at bottom it invoked listen
to implement listen
, listen
is used before accept
after we use bind
. Its original method is int listen(int sockfd, int backlog)
, the return value 0 represents success and -1 is fail.
Then just turn back to ensureOpen()
:
//sun.nio.ch.ServerSocketChannelImpl#ensureOpen// @throws ClosedChannelException if channel is closedprivate void ensureOpen() throws ClosedChannelException { if (!isOpen()) throw new ClosedChannelException();}//java.nio.channels.spi.AbstractInterruptibleChannel#isOpenpublic final boolean isOpen() { return !closed; }复制代码
If Socket
closed, throw ClosedChannelException
. Then we check Net#checkAddress
:
//sun.nio.ch.Net#checkAddress(java.net.SocketAddress)public static InetSocketAddress checkAddress(SocketAddress sa) { if (sa == null)//null address throw new NullPointerException(); //non InetSocketAddress type address if (!(sa instanceof InetSocketAddress)) throw new UnsupportedAddressTypeException(); // ## needs arg InetSocketAddress isa = (InetSocketAddress)sa; //invalid address if (isa.isUnresolved()) throw new UnresolvedAddressException(); // ## needs arg InetAddress addr = isa.getAddress(); //non-ipv4 and non-ipv6 address if (!(addr instanceof Inet4Address || addr instanceof Inet6Address)) throw new IllegalArgumentException("Invalid address type"); return isa;}复制代码
From the above code, we can see bind
will check the status of ServerSocket
whether it is open or closed and bind to address. If it doesn't bind to any address and it is still open, it will check the validation of socketaddress
. Then it will use bind
and listen
in Net
utils to finish binding ServerSocker
's address and start listening actually. If the args is smaller than 1, it will receive 50 connection.
accept
Compared with first article about BIO
, then we turn to see accept()
in ServerSocketChannel
:
//sun.nio.ch.ServerSocketChannelImpl#accept()@Overridepublic SocketChannel accept() throws IOException { acceptLock.lock(); try { int n = 0; FileDescriptor newfd = new FileDescriptor(); InetSocketAddress[] isaa = new InetSocketAddress[1]; boolean blocking = isBlocking(); try { begin(blocking); do { n = accept(this.fd, newfd, isaa); } while (n == IOStatus.INTERRUPTED && isOpen()); } finally { end(blocking, n > 0); assert IOStatus.check(n); } if (n < 1) return null; //For socketchannelimpl,default is blocking mode // newly accepted socket is initially in blocking mode IOUtil.configureBlocking(newfd, true); InetSocketAddress isa = isaa[0]; //Build SocketChannelImpl,It will explain in SocketChannelImpl SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa); // check permitted to accept connections from the remote address SecurityManager sm = System.getSecurityManager(); if (sm != null) { try { //check address and port access sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort()); } catch (SecurityException x) { sc.close(); throw x; } } //return socketchannelimpl return sc; } finally { acceptLock.unlock(); }}复制代码
Foraccept(this.fd, newfd, isaa)
, invoke accept
to accept the connection received from socket. It has been mentioned that, function will finally invoke int accept(int sockfd, struct sockaddr * addr, socklen_t*addrlen)
:
- If
fd
listen to the awaiting connection in queue ofsocket
and thesocket
isn't marked asNon-blocking
,accept()
will be blocked until connections appears. - If
socket
is marked asNon-blocking
and no awaiting connection in queue,accept()
will return errorEAGAIN
orEWOULDBLOCK
.
Here, the pair of begin(blocking)
and end(blocking, n>0)
we have mentioned in InterruptibleChannel and interrupted IO
. Now we mentioned again to let us know the application how it works. At this, we focus on the process of waiting connections, this process can appear Interrupted Exception. If this process finished normally, it will execute the following code, it doesn't mean Channel
has been closed. end(blocking, n>0)
the second argument completed
is just determine the status of the awaiting process, we don't think too much to enlarge the scope of functions.
supportedOptions
Now we can see other Impl method for NetworkChannel
, we can see supportedOptions
:
//sun.nio.ch.ServerSocketChannelImpl#supportedOptions@Overridepublic final Set> supportedOptions() { return DefaultOptionsHolder.defaultOptions;}//sun.nio.ch.ServerSocketChannelImpl.DefaultOptionsHolderprivate static class DefaultOptionsHolder { static final Set > defaultOptions = defaultOptions(); private static Set > defaultOptions() { HashSet > set = new HashSet<>(); set.add(StandardSocketOptions.SO_RCVBUF); set.add(StandardSocketOptions.SO_REUSEADDR); if (Net.isReusePortAvailable()) { set.add(StandardSocketOptions.SO_REUSEPORT); } set.add(StandardSocketOptions.IP_TOS); set.addAll(ExtendedSocketOptions.options(SOCK_STREAM)); //return HashSet which can't be changed return Collections.unmodifiableSet(set); }}复制代码
We can check configs in the above code:
//java.net.StandardSocketOptions//received buffer size for socket public static final SocketOptionSO_RCVBUF = new StdSocketOption ("SO_RCVBUF", Integer.class);//whether reusable address check public static final SocketOption SO_REUSEADDR = new StdSocketOption ("SO_REUSEADDR", Boolean.class);//whether reusable port checkpublic static final SocketOption SO_REUSEPORT = new StdSocketOption ("SO_REUSEPORT", Boolean.class);//Internet(IP header)'s(ToS)typepublic static final SocketOption IP_TOS = new StdSocketOption ("IP_TOS", Integer.class);复制代码
Impl for setOption
We have known about configs which are supported by above code, we can check the details in setOption
:
//sun.nio.ch.ServerSocketChannelImpl#setOption@OverridepublicServerSocketChannel setOption(SocketOption name, T value) throws IOException{ Objects.requireNonNull(name); if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); synchronized (stateLock) { ensureOpen(); if (name == StandardSocketOptions.IP_TOS) { ProtocolFamily family = Net.isIPv6Available() ? StandardProtocolFamily.INET6 : StandardProtocolFamily.INET; Net.setSocketOption(fd, family, name, value); return this; } if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) { // SO_REUSEADDR emulated when using exclusive bind isReuseAddress = (Boolean)value; } else { // no options that require special handling Net.setSocketOption(fd, Net.UNSPEC, name, value); } return this; }}复制代码
Now, we can see how supportedOptions().contaions(name)
works. It will first check the supported configs, then do some settings. Based on configs in Socket
, it mainly execute Net.setSocketOption
.
static void setSocketOption(FileDescriptor fd, ProtocolFamily family, SocketOption name, Object value) throws IOException{ if (value == null) throw new IllegalArgumentException("Invalid option value"); // only simple values supported by this method Class type = name.type(); if (extendedOptions.isOptionSupported(name)) { extendedOptions.setOption(fd, name, value); return; } //If it doestn't belong to Integer or Boolean, it will throw AssertionError if (type != Integer.class && type != Boolean.class) throw new AssertionError("Should not reach here"); // special handling if (name == StandardSocketOptions.SO_RCVBUF || name == StandardSocketOptions.SO_SNDBUF) { //determine the size of receive and send buffer int i = ((Integer)value).intValue(); if (i < 0) throw new IllegalArgumentException("Invalid send/receive buffer size"); } //If there's data in buffer. it will defer closing socket if (name == StandardSocketOptions.SO_LINGER) { int i = ((Integer)value).intValue(); if (i < 0) value = Integer.valueOf(-1); if (i > 65535) value = Integer.valueOf(65535); } //UDP unicast if (name == StandardSocketOptions.IP_TOS) { int i = ((Integer)value).intValue(); if (i < 0 || i > 255) throw new IllegalArgumentException("Invalid IP_TOS value"); } //UDP multicast if (name == StandardSocketOptions.IP_MULTICAST_TTL) { int i = ((Integer)value).intValue(); if (i < 0 || i > 255) throw new IllegalArgumentException("Invalid TTL/hop value"); } // map option name to platform level/name OptionKey key = SocketOptionRegistry.findOption(name, family); if (key == null) throw new AssertionError("Option not found"); int arg; //convert config if (type == Integer.class) { arg = ((Integer)value).intValue(); } else { boolean b = ((Boolean)value).booleanValue(); arg = (b) ? 1 : 0; } boolean mayNeedConversion = (family == UNSPEC); boolean isIPv6 = (family == StandardProtocolFamily.INET6); //set file descriptor and other arguments setIntOption0(fd, mayNeedConversion, key.level(), key.name(), arg, isIPv6);}复制代码
getOption
Ok, we can see impl in getOption
:
//sun.nio.ch.ServerSocketChannelImpl#getOption@Override@SuppressWarnings("unchecked")publicT getOption(SocketOption name) throws IOException{ Objects.requireNonNull(name); //If it is unsupported options, just throw UnsupportedOperationException if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); synchronized (stateLock) { ensureOpen(); if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) { // SO_REUSEADDR emulated when using exclusive bind return (T)Boolean.valueOf(isReuseAddress); } //If we can't accept above configs, it will handled by Net // no options that require special handling return (T) Net.getSocketOption(fd, Net.UNSPEC, name); }}//sun.nio.ch.Net#getSocketOptionstatic Object getSocketOption(FileDescriptor fd, ProtocolFamily family, SocketOption name) throws IOException{ Class type = name.type(); if (extendedOptions.isOptionSupported(name)) { return extendedOptions.getOption(fd, name); } //Only support Integer and boolean // only simple values supported by this method if (type != Integer.class && type != Boolean.class) throw new AssertionError("Should not reach here"); // map option name to platform level/name OptionKey key = SocketOptionRegistry.findOption(name, family); if (key == null) throw new AssertionError("Option not found"); boolean mayNeedConversion = (family == UNSPEC); //get options from file descriptor int value = getIntOption0(fd, mayNeedConversion, key.level(), key.name()); if (type == Integer.class) { return Integer.valueOf(value); } else { return (value == 0) ? Boolean.FALSE : Boolean.TRUE; }}复制代码
Differences and similarities in bind between ServerSocketChannel and ServerSocket
At the sector of Net.bind
, we have said when we receive a connection. then it will create a Socket
to do some operations. This can be seen in accept
. After we get this. then we new SocketChannelImpl(provider(), newfd, isa)
. So there will be a question. When we use bind
, do we need to bind to a Socket
? So how it do this in BIO
? Now we need to review that.
In the past, there's a setImpl()
when we invoke java.net.ServerSocket#ServerSocket(int, int, java.net.InetAddress)
:
//java.net.ServerSocket public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException { setImpl(); if (port < 0 || port > 0xFFFF) throw new IllegalArgumentException( "Port value out of range: " + port); if (backlog < 1) backlog = 50; try { bind(new InetSocketAddress(bindAddr, port), backlog); } catch(SecurityException e) { close(); throw e; } catch(IOException e) { close(); throw e; } }//java.net.ServerSocket#setImplprivate void setImpl() { if (factory != null) { impl = factory.createSocketImpl(); checkOldImpl(); } else { // No need to do a checkOldImpl() here, we know it's an up to date // SocketImpl! impl = new SocksSocketImpl(); } if (impl != null) impl.setServerSocket(this); }复制代码
However, we should focus on bind(new InetSocketAddress(bindAddr, port), backlog);
:
//java.net.ServerSocketpublic void bind(SocketAddress endpoint, int backlog) throws IOException { if (isClosed()) throw new SocketException("Socket is closed"); if (!oldImpl && isBound()) throw new SocketException("Already bound"); if (endpoint == null) endpoint = new InetSocketAddress(0); if (!(endpoint instanceof InetSocketAddress)) throw new IllegalArgumentException("Unsupported address type"); InetSocketAddress epoint = (InetSocketAddress) endpoint; if (epoint.isUnresolved()) throw new SocketException("Unresolved address"); if (backlog < 1) backlog = 50; try { SecurityManager security = System.getSecurityManager(); if (security != null) security.checkListen(epoint.getPort()); //Important!! getImpl().bind(epoint.getAddress(), epoint.getPort()); getImpl().listen(backlog); bound = true; } catch(SecurityException e) { bound = false; throw e; } catch(IOException e) { bound = false; throw e; } }复制代码
I have marked important at getImpl
, we can follow the source code to see what it do:
//java.net.ServerSocket#getImplSocketImpl getImpl() throws SocketException { if (!created) createImpl(); return impl;}复制代码
Since the init value for created
is false
, so it must enter createImpl()
method:
//java.net.ServerSocket#createImplvoid createImpl() throws SocketException { if (impl == null) setImpl(); try { impl.create(true); created = true; } catch (IOException e) { throw new SocketException(e.getMessage()); }}复制代码
Since impl
has been given value. So it will be impl.create(true)
, then created
will be set to true
. It finally reached my conclusion:
//java.net.AbstractPlainSocketImpl#createprotected synchronized void create(boolean stream) throws IOException { this.stream = stream; if (!stream) { ResourceManager.beforeUdpCreate(); // only create the fd after we know we will be able to create the socket fd = new FileDescriptor(); try { socketCreate(false); SocketCleanable.register(fd); } catch (IOException ioe) { ResourceManager.afterUdpClose(); fd = null; throw ioe; } } else { fd = new FileDescriptor(); socketCreate(true); SocketCleanable.register(fd); } if (socket != null) socket.setCreated(); if (serverSocket != null) serverSocket.setCreated();}复制代码
We can see socketCreate(true)
, its impl is the following:
@Overridevoid socketCreate(boolean stream) throws IOException { if (fd == null) throw new SocketException("Socket closed"); int newfd = socket0(stream); fdAccess.set(fd, newfd);}复制代码
We get a file descriptor by socket0(stream)
, Thus, socket
was created and binded. Now, we just turn back to sun.nio.ch.ServerSocketChannelImpl#accept()
. accept()
is used to deal with the following things after we impl channel and get connection. So for server, what does socket do when we bind it? Here, when we use ServerSocketChannel
, we need to base on JDK to use its general method called open()
. This is to lower the complexity during our use.
java.nio.channels.ServerSocketChannel#open
:
//java.nio.channels.ServerSocketChannel#openpublic static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel();}//sun.nio.ch.SelectorProviderImpl#openServerSocketChannelpublic ServerSocketChannel openServerSocketChannel() throws IOException { return new ServerSocketChannelImpl(this);}//sun.nio.ch.ServerSocketChannelImpl#ServerSocketChannelImpl(SelectorProvider)ServerSocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); this.fd = Net.serverSocket(true); this.fdVal = IOUtil.fdVal(fd);}//sun.nio.ch.Net#serverSocketstatic FileDescriptor serverSocket(boolean stream) { return IOUtil.newFD(socket0(isIPv6Available(), stream, true, fastLoopback));}复制代码
We can see, we just need to new a ServerSocketChannelImpl
object, it means we get a socket
, then we can do bind for that. But we need to know the type we got by ServerSocketChannel#open
is ServerSocketChannel
. When we accept a connection from clients, we should create a Socket
to build a communication between them. Thus, in the sun.nio.ch.ServerSocketChannelImpl#accept()
, we do SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);
and we get SocketChannel
type Object. As a result of this, we can define our methods such as accept and write for Socket in this class.
Extension of socket method in ServerSocketChannel
For ServerSocketChannel
, we also need to learn about socket()
//sun.nio.ch.ServerSocketChannelImpl#socket@Overridepublic ServerSocket socket() { synchronized (stateLock) { if (socket == null) socket = ServerSocketAdaptor.create(this); return socket; }}复制代码
We can see socket
is created by ServerSocketAdaptor
. It's a class which is implemented by ServerSocketChannelImpl
. The purpose of it is to adapt our habitat for use of ServerSocket
. Thus, ServerSocketAdaptor
extended ServerSocket
and rewrite its methods in order. So it makes us have more options to write this part of functions.
We have mentioned the impl for java.nio.channels.spi.AbstractInterruptibleChannel#close
in . So we just reviewed it here and try to talk something new:
//java.nio.channels.spi.AbstractInterruptibleChannel#closepublic final void close() throws IOException { synchronized (closeLock) { if (closed) return; closed = true; implCloseChannel(); }}//java.nio.channels.spi.AbstractSelectableChannel#implCloseChannelprotected final void implCloseChannel() throws IOException { implCloseSelectableChannel(); // clone keys to avoid calling cancel when holding keyLock SelectionKey[] copyOfKeys = null; synchronized (keyLock) { if (keys != null) { copyOfKeys = keys.clone(); } } if (copyOfKeys != null) { for (SelectionKey k : copyOfKeys) { if (k != null) { k.cancel(); // invalidate and adds key to cancelledKey set } } } }//sun.nio.ch.ServerSocketChannelImpl#implCloseSelectableChannel@Overrideprotected void implCloseSelectableChannel() throws IOException { assert !isOpen(); boolean interrupted = false; boolean blocking; // set state to ST_CLOSING synchronized (stateLock) { assert state < ST_CLOSING; state = ST_CLOSING; blocking = isBlocking(); } // wait for any outstanding accept to complete if (blocking) { synchronized (stateLock) { assert state == ST_CLOSING; long th = thread; if (th != 0) { //If local thread isn't null, local socket will close in advance and tell thread to close. nd.preClose(fd); NativeThread.signal(th); // wait for accept operation to end while (thread != 0) { try { stateLock.wait(); } catch (InterruptedException e) { interrupted = true; } } } } } else { // non-blocking mode: wait for accept to complete acceptLock.lock(); acceptLock.unlock(); } // set state to ST_KILLPENDING synchronized (stateLock) { assert state == ST_CLOSING; state = ST_KILLPENDING; } // close socket if not registered with Selector //If it didn't register on selector, it will be directly killed which means close file descriptor. if (!isRegistered()) kill(); // restore interrupt status // It approved what we mentioned in last article if we interupt thread, we need to set the status of thread is interrupt. if (interrupted) Thread.currentThread().interrupt();}@Overridepublic void kill() throws IOException { synchronized (stateLock) { if (state == ST_KILLPENDING) { state = ST_KILLED; nd.close(fd); } }}复制代码
close() in channel
Since we didn't talk too much about close()
in last article. We more focus on applications on SocketChannel
, and it is more related to exchange data between clients and server. So if the connection breaks, useless Channels will be closed.
sun.nio.ch.ServerSocketChannelImpl#accept()
:
@Overridepublic SocketChannel accept() throws IOException { ... // newly accepted socket is initially in blocking mode IOUtil.configureBlocking(newfd, true); InetSocketAddress isa = isaa[0]; SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa); // check permitted to accept connections from the remote address SecurityManager sm = System.getSecurityManager(); if (sm != null) { try { sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort()); } catch (SecurityException x) { sc.close(); throw x; } } return sc; } finally { acceptLock.unlock(); }}复制代码
In above code, it will do validation check for remote address. It there's exceptions happen, it will close SocketChannel
which has been created.
Another meaningful way to use close()
is if we build connections for clients and there's exceptions in connections, we also need to close all Sockets we have created:
//java.nio.channels.SocketChannel#open(java.net.SocketAddress)public static SocketChannel open(SocketAddress remote) throws IOException { SocketChannel sc = open(); try { sc.connect(remote); } catch (Throwable x) { try { sc.close(); } catch (Throwable suppressed) { x.addSuppressed(suppressed); } throw x; } assert sc.isConnected(); return sc; }复制代码
Thus, the bytecode ServerSocketChannelImpl
is loading, it will create SocketDispatcher
. We can do I/O operations in different platforms within different local methods based on SocketDispatcher
and also do the same thing in Socket
:
//sun.nio.ch.SocketDispatcherclass SocketDispatcher extends NativeDispatcher{ static { IOUtil.load(); } //read int read(FileDescriptor fd, long address, int len) throws IOException { return read0(fd, address, len); } long readv(FileDescriptor fd, long address, int len) throws IOException { return readv0(fd, address, len); } //write int write(FileDescriptor fd, long address, int len) throws IOException { return write0(fd, address, len); } long writev(FileDescriptor fd, long address, int len) throws IOException { return writev0(fd, address, len); } //preclose FileDescriptior void preClose(FileDescriptor fd) throws IOException { preClose0(fd); } //close void close(FileDescriptor fd) throws IOException { close0(fd); } //-- Native methods static native int read0(FileDescriptor fd, long address, int len) throws IOException; static native long readv0(FileDescriptor fd, long address, int len) throws IOException; static native int write0(FileDescriptor fd, long address, int len) throws IOException; static native long writev0(FileDescriptor fd, long address, int len) throws IOException; static native void preClose0(FileDescriptor fd) throws IOException; static native void close0(FileDescriptor fd) throws IOException;}复制代码
FileDescriptor
We have seen FileDescriptor
has appeared too much times. Now we will introduce it. FileDescriptor
is used to open files and socket or byte source and receiver. The main application is to create a FileInputStream
or FileOutputStream
to contain it.
PS: Program applications shouldn't create their own file descriptor.
Now we turn to its source code:
public final class FileDescriptor { private int fd; private long handle; private Closeable parent; private ListotherParents; private boolean closed; /** * true, if file is opened for appending. */ private boolean append; static { initIDs(); } /** * Close FileDescriptor in not clear condition to clear */ private PhantomCleanable cleanup; /** * Build a invalid FileDescriptor,fd or handle will set in later */ public FileDescriptor() { fd = -1; handle = -1; } /** * Used for standard input, output, and error only. * For Windows the corresponding handle is initialized. * For Unix the append mode is cached. * @param fd the raw fd number (0, 1, 2) */ private FileDescriptor(int fd) { this.fd = fd; this.handle = getHandle(fd); this.append = getAppend(fd); } ...}复制代码
We usually usejava.lang.System.in
,java.lang.System#out
,java.lang.System#err
to output.
public static final FileDescriptor in = new FileDescriptor(0);public static final FileDescriptor out = new FileDescriptor(1);public static final FileDescriptor err = new FileDescriptor(2);复制代码
We can test the validation of these File descriptors by using the following method:
//java.io.FileDescriptor#validpublic boolean valid() { return (handle != -1) || (fd != -1); }复制代码
If return value is true, then we can see this file descriptor which represent socket
, I/O operation and network connection is valid. Otherwise, false is invalid.
If someone has interest, just explore source code.
SocketChannel in NIO
We have learnt about SocketChannel
, we just try to explore its details:
At the same time, we can invoke open
to create socket channel
. But we need to focus:
- We can't create
channel
for socket which has been in exist. - New Created
SocketChannel
has open but it isn't connected. - Try to do
I/O
operations onsocket
which has no connections, it will throwNotYetConnectedException
- It can invoke
connect
to connectsocket channel
- Once connection is built,
socket channel
will keep alive until it close. - We can check the status of
socketChannel
by usingisConnected()
.SocketChannel
supports non-blocking connection. SocketChannel
can be created firstly, and we useconnect()
to connect remotesocket
- Invoke
finishConnect
to close connection - We can use
isConnectionPending
to check whether there's any connections in pending.SocketChannel
support asynchronous close. It is very similar to asynchronous close inChannel
class. - If
socket
's input stream is closed by a thread and another thread do I/O operation on thissocket channel
it will be blocked, then read operation will can't read anything, it will return-1
. - If
socket
's input stream is closed by a thread and another thread do I/O operation on thissocket channel
it will be blocked, write operation will be blocked, and blocked program will receiveAsynchronousCloseException
.
Now we check its detailed impl method.
ServerSocketChannel and its open()
//java.nio.channels.SocketChannel#open()public static SocketChannel open() throws IOException { return SelectorProvider.provider().openSocketChannel();}//java.nio.channels.SocketChannel#open(java.net.SocketAddress)//We don't need to invoke connect againpublic static SocketChannel open(SocketAddress remote) throws IOException{ //default is blocking SocketChannel sc = open(); try { sc.connect(remote); } catch (Throwable x) { try { sc.close(); } catch (Throwable suppressed) { x.addSuppressed(suppressed); } throw x; } assert sc.isConnected(); return sc;}//sun.nio.ch.SelectorProviderImpl#openSocketChannelpublic SocketChannel openSocketChannel() throws IOException { return new SocketChannelImpl(this);}//sun.nio.ch.SocketChannelImpl#SocketChannelImpl(java.nio.channels.spi.SelectorProvider)SocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); //invoke socket function,true is TCP this.fd = Net.socket(true); this.fdVal = IOUtil.fdVal(fd);}//sun.nio.ch.Net#socket(boolean)static FileDescriptor socket(boolean stream) throws IOException { return socket(UNSPEC, stream);}//sun.nio.ch.Net#socket(java.net.ProtocolFamily, boolean)static FileDescriptor socket(ProtocolFamily family, boolean stream) throws IOException { boolean preferIPv6 = isIPv6Available() && (family != StandardProtocolFamily.INET); return IOUtil.newFD(socket0(preferIPv6, stream, false, fastLoopback));}//sun.nio.ch.IOUtil#newFDpublic static FileDescriptor newFD(int i) { FileDescriptor fd = new FileDescriptor(); setfdVal(fd, i); return fd;}static native void setfdVal(FileDescriptor fd, int value);复制代码
Source code for Net.socket(true)
:
JNIEXPORT jint JNICALLJava_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6, jboolean stream, jboolean reuse, jboolean ignored){ int fd; //TCP is SOCK_STREAM,UDP is SOCK_DGRAM, and stream=true; int type = (stream ? SOCK_STREAM : SOCK_DGRAM); //determine ipv6 or ipv4 int domain = (ipv6_available() && preferIPv6) ? AF_INET6 : AF_INET; //invoke Linux's socket function,domain is protocol; //typeis ssl type,protocol is set 0 to represent default transfer protocol fd = socket(domain, type, 0); //Error if (fd < 0) { return handleSocketError(env, errno); } /* Disable IPV6_V6ONLY to ensure dual-socket support */ if (domain == AF_INET6) { int arg = 0; //arg=1 set ipv6's socket only receive ipv6 context, arg=0 represent it can receive ipv4 context if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&arg, sizeof(int)) < 0) { JNU_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException", "Unable to set IPV6_V6ONLY"); close(fd); return -1; } } //SO_REUSEADDR has 4 applicaitons: //1. when there's a same local address and port socket1 and its status is awaiting, but socket2 want to use this address and port, your program will need this option //2.SO_REUSEADDR allow same port to run multiple instances(threads) which is the same server, but binded IP addresses are different //3.SO_REUSEADDR allow single thread to bind same port to multiple sockets, but binded sockets have different ip addresses. //4.SO_REUSEADDR allow completely same address and port to bind repeatly. But this is only used for UDP multicast instead of TCP if (reuse) { int arg = 1; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, sizeof(arg)) < 0) { JNU_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException", "Unable to set SO_REUSEADDR"); close(fd); return -1; } }#if defined(__linux__) if (type == SOCK_DGRAM) { int arg = 0; int level = (domain == AF_INET6) ? IPPROTO_IPV6 : IPPROTO_IP; if ((setsockopt(fd, level, IP_MULTICAST_ALL, (char*)&arg, sizeof(arg)) < 0) && (errno != ENOPROTOOPT)) { JNU_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException", "Unable to set IP_MULTICAST_ALL"); close(fd); return -1; } } //IPV6_MULTICAST_HOPS used to control multicast size, // 1 represent local network forward. //Please see(http://www.ctt.sbras.ru/cgi-bin/www/unix_help/unix-man?ip6+4); /* By default, Linux uses the route default */ if (domain == AF_INET6 && type == SOCK_DGRAM) { int arg = 1; if (setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &arg, sizeof(arg)) < 0) { JNU_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException", "Unable to set IPV6_MULTICAST_HOPS"); close(fd); return -1; } }#endif return fd;}复制代码
From Linux 3.9, it added new feature SO_REUSEPORT
, it allows multiple sockets can be binded to the completely same address and port. It doesn't care about it's listened or non-listened, also if it was set SP_REUSEPORT
properties whatever TCP or UDP, it will can do this work.
To prevent port_hijacking
, there's a special limit. It should allows all sockets which want to share address and port to have same Effective user id
. It means it prevents one user steal port from another user. Also, kernel use special technique to deal with SO_REUSEPORT socket
- For UDP socket, kernel will try to transfer data in average.
- For TCP which listen to socket, kernel will try to forward new client's connections(which is received by
accept()
) to give a socket with shared address and port.
For example, a simple server can run multiple instances by SO_REUSEPORT socket
, it will impl a simple Load balancing strategy, since kernel has done the dispatch work.
From previous code, when socket
is created successfully, it will invoke IOUTIL.newFD
to create file descriptor. How can we know the socket is readable, writable or errors? Since there're only 3 status which are write, read and error. Since socket is binded to SocketChannel
, then we also can bind FileDescriptor
to this, then we can get its status. Since FileDescriptor
doesn't provide any external method to set fd, but setfdVal
can be impl by local method.
JNIEXPORT void JNICALLJava_sun_nio_ch_IOUtil_setfdVal(JNIEnv *env, jclass clazz, jobject fdo, jint val){ (*env)->SetIntField(env, fdo, fd_fdID, val);}复制代码
If anyone know something about shell in Linux, we can know shell will use redirect for dealing with exception. It means it will output by its redirected channel. 0 and 1 are pointed to the same channel, it also represent status. So we can operate with its Socket status which to change interest ops
in SelectionKey
. So we need to classify the status of SelectionKey
by input and output. it will help us for IO operations. Now we turn back to open()
in SocketChannel
. In ``SelectorProvider.provider().openSocketChannel(), it returns
SocketChannelImplobject instance. But we didn't see
this.statehas any value in
SocketChannelImpl(SelectorProvider sp), so default value is 0 which is
ST_UNCONNECTED`. At the same time, socket is default blocking.
In general, when we operate in asynchronous, we use open()
without arguments and invoke configureBlocking
to set non-blocking.
connect in SocketChannel
source code for connect()
:
//sun.nio.ch.SocketChannelImpl#connect@Overridepublic boolean connect(SocketAddress sa) throws IOException { InetSocketAddress isa = Net.checkAddress(sa); SecurityManager sm = System.getSecurityManager(); if (sm != null) sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); InetAddress ia = isa.getAddress(); if (ia.isAnyLocalAddress()) ia = InetAddress.getLocalHost(); try { readLock.lock(); try { writeLock.lock(); try { int n = 0; boolean blocking = isBlocking(); try { //support interruptible thread by set current thread Interruptible blocker beginConnect(blocking, isa); do { //invoke connect to impl,if use blocking mode, it will be waiting util it succeed or throw exception n = Net.connect(fd, ia, isa.getPort()); } while (n == IOStatus.INTERRUPTED && isOpen()); } finally { endConnect(blocking, (n > 0)); } assert IOStatus.check(n); //connection success return n > 0; } finally { writeLock.unlock(); } } finally { readLock.unlock(); } } catch (IOException ioe) { // connect failed, close the channel close(); throw SocketExceptions.of(ioe, isa); }}复制代码
For beginConnect
and endConnect
, they enhanced begin()
and end()
in AbstractInterruptibleChannel
. If its non-blocking channel
, we don't need to care about interruption in connection. It only means it will happen when blocking wait appears.
//sun.nio.ch.SocketChannelImpl#beginConnectprivate void beginConnect(boolean blocking, InetSocketAddress isa) throws IOException{ //only blocking, it will enter begin if (blocking) { // set hook for Thread.interrupt begin(); } synchronized (stateLock) { //default open, except it invoke close ensureOpen(); //check state of connection int state = this.state; if (state == ST_CONNECTED) throw new AlreadyConnectedException(); if (state == ST_CONNECTIONPENDING) throw new ConnectionPendingException(); //assert current state, if it isn't connected, give it value to present connecting status assert state == ST_UNCONNECTED; //connecting this.state = ST_CONNECTIONPENDING; //if it doesn't invoke bind, then it will execute if (localAddress == null) NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort()); remoteAddress = isa; if (blocking) { // record thread so it can be signalled if needed readerThread = NativeThread.current(); } }}复制代码
During the connection, we just need to focus on the connection status:
ST_UNCONNECTED
、ST_CONNECTED
、ST_CONNECTIONPENDING
、ST_CLOSING
、ST_KILLPENDING
、ST_KILLED
, because it's a public status or shared status. So it can be operated by multiple threads. In order to do this, state
is defined as volatile
, When it is changed, it need stateLock
to help synchronized
:
//sun.nio.ch.SocketChannelImpl#endConnectprivate void endConnect(boolean blocking, boolean completed) throws IOException{ endRead(blocking, completed); //when n>0,connection success, status bacome ST_CONNECTED if (completed) { synchronized (stateLock) { if (state == ST_CONNECTIONPENDING) { localAddress = Net.localAddress(fd); state = ST_CONNECTED; } } }}//sun.nio.ch.SocketChannelImpl#endReadprivate void endRead(boolean blocking, boolean completed) throws AsynchronousCloseException{ //blocking is true,it will exexute if (blocking) { synchronized (stateLock) { readerThread = 0; // notify any thread waiting in implCloseSelectableChannel if (state == ST_CLOSING) { stateLock.notifyAll(); } } //end is paired with begin, when thread is interrupted, throw ClosedByInterruptException // remove hook for Thread.interrupt end(completed); }}复制代码
Now we go to Net.connect(fd, ia, isa.getPort())
:
//sun.nio.ch.Net#connectstatic int connect(FileDescriptor fd, InetAddress remote, int remotePort) throws IOException{ return connect(UNSPEC, fd, remote, remotePort);}//sun.nio.ch.Net#connectstatic int connect(ProtocolFamily family, FileDescriptor fd, InetAddress remote, int remotePort) throws IOException{ boolean preferIPv6 = isIPv6Available() && (family != StandardProtocolFamily.INET); return connect0(preferIPv6, fd, remote, remotePort);}复制代码
This function will finally invoke native method:
JNIEXPORT jint JNICALLJava_sun_nio_ch_Net_connect0(JNIEnv *env, jclass clazz, jboolean preferIPv6, jobject fdo, jobject iao, jint port){ SOCKETADDRESS sa; int sa_len = 0; int rv; //address will be convereted into struct sockaddr format if (NET_InetAddressToSockaddr(env, iao, port, &sa, &sa_len, preferIPv6) != 0) { return IOS_THROWN; } //put fd and sockaddr,it will connect to remote server,which is Three-way handshake in TCP //if set configureBlocking(false), it will be non-blocking, otherwise, it won't be blocked until timeout or throw Exceptions. rv = connect(fdval(env, fdo), &sa.sa, sa_len); //0 success,when fail, use errno to get infomation if (rv != 0) { //non-blocking, connection hasn't been built(-2) if (errno == EINPROGRESS) { return IOS_UNAVAILABLE; } else if (errno == EINTR) { //interrupt(-3) return IOS_INTERRUPTED; } return handleSocketError(env, errno); } //connection build, tcp connection need time to connect, but local network doesn't need that. //Non-blocking will return IOS_UNAVAILABLE generally; return 1;}复制代码
From the above comment, if it's non-blocking and connection hasn't be built, it will return -2. From beignConnect
we can know, current status is ST_CONNECTIONPENDING
. So, when it will become ST_CONNECTED
in non-blocking condition?. Now, we can see sun.nio.ch.SocketChannelImpl#finishConnect
.
finishConnect in SocketChannelImpl
//java.net.Socket#Socketprivate Socket(SocketAddress address, SocketAddress localAddr, boolean stream) throws IOException { setImpl(); // backward compatibility if (address == null) throw new NullPointerException(); try { createImpl(stream); if (localAddr != null) bind(localAddr); connect(address); } catch (IOException | IllegalArgumentException | SecurityException e) { try { close(); } catch (IOException ce) { e.addSuppressed(ce); } throw e; }}复制代码
Here, we can get new instance of sun.nio.ch.SocketAdaptor
by invoking java.nio.channels.SocketChannel#open()
firstly then invoking socket()
.
Now, just check connect
in SocketAdaptor
:
//sun.nio.ch.SocketAdaptor#connectpublic void connect(SocketAddress remote) throws IOException { connect(remote, 0);}public void connect(SocketAddress remote, int timeout) throws IOException { if (remote == null) throw new IllegalArgumentException("connect: The address can't be null"); if (timeout < 0) throw new IllegalArgumentException("connect: timeout can't be negative"); synchronized (sc.blockingLock()) { if (!sc.isBlocking()) throw new IllegalBlockingModeException(); try { //If there's no setting for timeout, it will wait unitl connection success or Exceptions happen // no timeout if (timeout == 0) { sc.connect(remote); return; } //Timout is set, Socket will be non-blocking // timed connect sc.configureBlocking(false); try { if (sc.connect(remote)) return; } finally { try { sc.configureBlocking(true); } catch (ClosedChannelException e) { } } long timeoutNanos = NANOSECONDS.convert(timeout, MILLISECONDS); long to = timeout; for (;;) { //By calculating timeout value, it will try to connect infintely during permitted time //If timeout, just close Socket long startTime = System.nanoTime(); if (sc.pollConnected(to)) { boolean connected = sc.finishConnect(); //See below explaination assert connected; break; } timeoutNanos -= System.nanoTime() - startTime; if (timeoutNanos <= 0) { try { sc.close(); } catch (IOException x) { } throw new SocketTimeoutException(); } to = MILLISECONDS.convert(timeoutNanos, NANOSECONDS); } } catch (Exception x) { Net.translateException(x, true); } }}复制代码
The Java assert keyword was introduced in Java 1.4, so it’s been around for quite a while. To avoid conflict with assert
in old Java code, **Java won't start assert check when it is executing.**At this time, all assert will be ignored. If we want to open it, just use -enableassertions
or -ea
.
//sun.nio.ch.SocketChannelImpl#finishConnect@Overridepublic boolean finishConnect() throws IOException { try { readLock.lock(); try { writeLock.lock(); try { // no-op if already connected if (isConnected()) return true; boolean blocking = isBlocking(); boolean connected = false; try { beginFinishConnect(blocking); int n = 0; if (blocking) { do { n = checkConnect(fd, true); } while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen()); } else { n = checkConnect(fd, false); } connected = (n > 0); } finally { endFinishConnect(blocking, connected); } assert (blocking && connected) ^ !blocking; return connected; } finally { writeLock.unlock(); } } finally { readLock.unlock(); } } catch (IOException ioe) { // connect failed, close the channel close(); throw SocketExceptions.of(ioe, remoteAddress); }}//sun.nio.ch.SocketChannelImpl#checkConnectprivate static native int checkConnect(FileDescriptor fd, boolean block) throws IOException;复制代码
For beginFinishConnect
and endFinishConnect
, they're similar to sun.nio.ch.SocketChannelImpl#beginConnect
and sun.nio.ch.SocketChannelImpl#endConnect
. For the rest, we just need to focus on core logic checkConnect(fd, true)
. It's also a local method:
JNIEXPORT jint JNICALLJava_sun_nio_ch_SocketChannelImpl_checkConnect(JNIEnv *env, jobject this, jobject fdo, jboolean block){ int error = 0; socklen_t n = sizeof(int); //get FileDescriptor's fd jint fd = fdval(env, fdo); int result = 0; struct pollfd poller; //FileDescriptor poller.fd = fd; //request event is writing poller.events = POLLOUT; //return event poller.revents = 0; //-1->blocking,0->return directly and don't block thread result = poll(&poller, 1, block ? -1 : 0); //if result<0, invoke failed if (result < 0) { if (errno == EINTR) { return IOS_INTERRUPTED; } else { JNU_ThrowIOExceptionWithLastError(env, "poll failed"); return IOS_THROWN; } } //non-blocking, 0->connecting in pending if (!block && (result == 0)) return IOS_UNAVAILABLE; //the number of socket which is ready to write or have exception>0 if (result > 0) { errno = 0; result = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &n); //error if (result < 0) { return handleSocketError(env, errno); //handler error } else if (error) { return handleSocketError(env, error); } else if ((poller.revents & POLLHUP) != 0) { return handleSocketError(env, ENOTCONN); } //socket is ready to write which means connection has been built // connected return 1; } return 0;}复制代码
The detailed process has been explained in the comment. For blocking and non-blocking, they are responded to sun.nio.ch.SocketChannelImpl#finishConnect
's operations. In another hand, from the source code, it used poll
to check socket
status. Based on this, it will know whether connection is successful or not. Since at non-blocking mode, finishConnect
will return directly.It will use sun.nio.ch.SocketAdaptor#connect
to handle this and check status of connections in loop. However, it's unrecommend in NIO programming. It's just semi-finished. We suggest to register on Selector
. Based on ops=OP_CONECT
to get SelectionKey
, then invoke finishConnect
to complete the connection.
Since finishConnect
will be updated as ST_CONNECTED
, the status will be changed when we do read
and write
. Thus, we must invoke finishConnect
.
Overall, we have introduced Selector
and SelectionKey
which will appear in the future article. We will explain them in details in next article.