이번시간엔 Proactor기능에 대해 포스팅해보려고 한다. Proactor pattern은 비동기식 프로세스가 가능한 일들을 demultiplexing한 뒤, 작업까지 전부 비동기로 처리한다. 작업이 완료되면 비동기 프로세스가 completion dispatch에게 이벤트를 넘기고 dispatcher는 적절한 completion handler에 이벤트를 dispatch한다. completion handler에 event가 dispatch되면 completion handler는 미리 정해진 콜백을 호출하여 process event를 처리한다. 전체적인 프로엑터 그림은 아래와 같다.
1. Demultiplex class를 만들어 다음과 같이 코딩한다.
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class Demultiplexer implements CompletionHandler<Integer, ByteBuffer>
{
private AsynchronousSocketChannel channel;
private NioHandleMap handleMap;
public Demultiplexer(AsynchronousSocketChannel channel, NioHandleMap handleMap)
{
this.channel = channel;
this.handleMap = handleMap;
}
@Override
public void completed(Integer result, ByteBuffer buffer)
{
if (result == -1)
{
try {
channel.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
else if (result > 0)
{
buffer.flip();
String header = new String(buffer.array());
NioEventHandler handler = handleMap.get(header);
ByteBuffer newBuffer = ByteBuffer.allocate(handler.getDataSize());
handler.initialize(channel, newBuffer);
channel.read(newBuffer, newBuffer, handler);
}
}
@Override
public void failed(Throwable exc, ByteBuffer buffer)
{
}
}
2. Dispatch class파일을 만들고 다음과 같이 코딩한다.
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class Dispatcher implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>
{
private int HEADER_SIZE = 6;
private NioHandleMap handleMap;
public Dispatcher(NioHandleMap handleMap)
{
this.handleMap = handleMap;
}
@Override
public void completed(AsynchronousSocketChannel channel, AsynchronousServerSocketChannel listener)
{
listener.accept(listener, this);
ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE);
channel.read(buffer, buffer, new Demultiplexer(channel, handleMap));
}
@Override
public void failed(Throwable exc, AsynchronousServerSocketChannel listener)
{
}
}
3. NioEventHandler interface파일을 만들어 다음과 같이 코딩한다.
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public interface NioEventHandler extends CompletionHandler<Integer, ByteBuffer>
{
public String getHeader();
public int getDataSize();
public void initialize(AsynchronousSocketChannel channel, ByteBuffer buffer);
}
4. NioHandleMap class파일을 만들고 다음과 같이 코딩한다.
import java.util.HashMap;
public class NioHandleMap extends HashMap<String, NioEventHandler>
{
}
5. NioSayEventHandler class파일을 만들고 다음과 같이 코딩한다.
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.StringTokenizer;
public class NioSayHelloEventHandler implements NioEventHandler
{
private static final int NUM_TOKEN = 2;
private AsynchronousSocketChannel channel;
private ByteBuffer buffer;
@Override
public String getHeader()
{
return "0x5001";
}
@Override
public int getDataSize()
{
return 512;
}
@Override
public void initialize(AsynchronousSocketChannel channel, ByteBuffer buffer)
{
this.channel = channel;
this.buffer = buffer;
}
@Override
public void completed(Integer result, ByteBuffer attachment)
{
if (result == -1)
{
try {
channel.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
else if (result > 0)
{
buffer.flip();
String message = new String(buffer.array());
String[] params = new String[NUM_TOKEN];
StringTokenizer token = new StringTokenizer(message, "|");
int i = 0;
while (token.hasMoreTokens()) {
params[i] = token.nextToken();
i++;
}
sayHello(params);
buffer.clear();
try
{
channel.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment)
{
}
private void sayHello(String[] params)
{
System.out.println("SayHello / NAME: " + params[0] + " / AGE: " + params[1]);
}
6. NioUpdateProfileEventHandler class파일을 만들고 다음과 같이 코딩한다.
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.StringTokenizer;
public class NioUpdateProfileEventHandler implements NioEventHandler
{
private static final int NUM_TOKEN = 6;
private AsynchronousSocketChannel channel;
private ByteBuffer buffer;
@Override
public String getHeader()
{
return "0x6001";
}
@Override
public int getDataSize()
{
return 1024;
}
@Override
public void initialize(AsynchronousSocketChannel channel, ByteBuffer buffer)
{
this.channel = channel;
this.buffer = buffer;
}
@Override
public void completed(Integer result, ByteBuffer attachment)
{
if (result == -1)
{
try {
channel.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
else if (result > 0)
{
buffer.flip();
String message = new String(buffer.array());
String[] params = new String[NUM_TOKEN];
StringTokenizer token = new StringTokenizer(message, "|");
int i = 0;
while (token.hasMoreTokens())
{
params[i] = token.nextToken();
i++;
}
updateProfile(params);
buffer.clear();
try
{
channel.close();
} catch (IOException e)
{
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment)
{
}
private void updateProfile(String[] params)
{
System.out.println("UpdateProfile / " + "ID: " + params[0] + " / " + "PASSWORD: " + params[1] + " / "
+ "NAME: " + params[2] + " / " + "AGE: " + params[3] + " / " + "GENDER: " + params[4]);
}
}
7. Serverinitializer class파일을 만들고 다음과 같이 코딩한다.
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ServerInitializer
{
private static int PORT = 5000;
private static int threadPoolSize = 30;
private static int initialSize = 20;
private static int backlog = 50;
public static void main(String[] args)
{
System.out.println("SERVER START at PORT: " + PORT + "!");
NioHandleMap handleMap = new NioHandleMap();
NioEventHandler sayHelloHandler = new NioSayHelloEventHandler();
NioEventHandler updateProfileHandler = new NioUpdateProfileEventHandler();
handleMap.put(sayHelloHandler.getHeader(), sayHelloHandler);
handleMap.put(updateProfileHandler.getHeader(), updateProfileHandler);
ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
try
{
AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(executor, initialSize);
AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(group);
listener.bind(new InetSocketAddress(PORT), backlog);
listener.accept(listener, new Dispatcher(handleMap));
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
9. TestClient class파일을 만들고 다음과 같이 코딩한다.
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
public class TestClient {
public static void main(String[] args)
{
System.out.println("Client ON");
try
{
String message;
Socket socket = new Socket("127.0.0.1", 5000);
OutputStream out = socket.getOutputStream();
message = "0x5001|이건주|26";
out.write(message.getBytes());
socket.close();
Socket socket2 = new Socket("127.0.0.1", 5000);
OutputStream out2 = socket2.getOutputStream();
message = "0x6001|Lee|1234|이건주|26|남성";
out2.write(message.getBytes());
socket2.close();
}
catch (UnknownHostException e)
{
e.printStackTrace();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
10. 모든 파일을 컴파일링 하면 다음과 같이 실행된다는 걸 볼 수 있다.
정상적으로 서버를 잘 형성하며 받고 있다.