Reactor Thread

Server/Reactor 2016. 8. 11. 00:20
반응형
SMALL

이번 시간엔 Reactor Thread에 대해 포스팅 해보려고 한다. 서버는 한번에 한가지 일만을 처리하지 않는다. 동시다발적으로 여러가지 프로토콜을 처리하는것이 서버이다. 이렇게 동시에 처리 할 수 있도록 해주는 것이 Thread기능이다. 하지만 Thread도 적당히 있으면 좋은데 이것도 너무 많아지게 되면 갑자기 처리량이 증가해서 서버는 당연히 느려지게 된다. 그러다보면 과부하가 발생하는것은 기본이다. 그러다보면 메모리에도 부담이 가게되고 여러모로 악재가 겹친다. 그래서 사용되는 것이 Thread Pool기능이다. 접속이 들어오기 전에 미리 스레드를 만들어서 스레드를 생성하는 지연을 줄이고 스레드의 갯수를 제한하고 다 쓴 스레드를 재활용한다. 



스레드풀을 만드는 방법은 다음과 같다.


1. Demultiplexer class파일을 만들고 다음과같이 코딩한다.


import java.io.IOException;

import java.io.InputStream;

import java.net.Socket;


public class Demultiplexer implements Runnable

{

   private final int HEADER_SIZE = 6;

   private Socket socket;

   private HandleMap handleMap;

public Demultiplexer(Socket socket, HandleMap handleMap)

{

    this.socket = socket;

    this.handleMap = handleMap;

}


public void run()

{

try

{

   InputStream inputStream = socket.getInputStream();

   byte[] buffer = new byte[HEADER_SIZE];

   inputStream.read(buffer);

   String header = new String(buffer);

   handleMap.get(header).hanedleEvent(inputStream);

   socket.close();

}

catch (IOException e)

{

    e.printStackTrace();

   }

  }

}


2. Dispatcher interface를 만들어주고 다음과 같이 코딩한다.


import java.net.ServerSocket;


public interface Dispatcher 

{

   public void dispatch(ServerSocket serverSocket, HandleMap handler);


}


3. EventHandler interface를 만들어주고 다음과 같이 코딩한다.

import java.io.InputStream;


public interface EventHandler 

{

    public String getHandler();

    public void handleEvent(InputStream inputStream);


}


4.  HandleMap class를 만들고 다음과 같이 코딩한다.


import java.util.HashMap;


public class HandleMap extends HashMap<String, EventHandler> 

{


}


5. Reactor class를 만들어주고 다음과 같이 코딩한다.


import java.io.IOException;

import java.net.ServerSocket;


public class Reactor 

{

   private ServerSocket serverSocket;

   private HandleMap handleMap;

public Reactor(int port)

{

    handleMap = new HandleMap();

try

{

    serverSocket = new ServerSocket(port);

}

catch (IOException e)

{

   e.printStackTrace();

}

}

public void startServer() 

{

   Dispatcher dispatcher = new ThreadPoolDispatcher();

   dispatcher.dispatch(serverSocket, handleMap);

}

public void registerHandler(EventHandler handler)

{

   handleMap.put(handler.getHandler(), handler);

}

public void removeHandler(EventHandler handler)

{

   handleMap.remove(handler.getHandler());

}

}


6. Serverinitializer class를 만들어주고 다음과 같이 코딩한다.

import java.io.IOException;

import java.net.ServerSocket;



public class Serverinitializer 

{


public static void main(String[] args

{

  int port = 5000;

  System.out.println("Server ON :" + port);

  Reactor reactor = new Reactor(port);

  reactor.registerHandler(new StreamSayHelloEventHandler());

  reactor.registerHandler(new StreamUpdateProfileEventHandler());

  reactor.startServer();

}

}


7. StreamSayHelloEventHandler class를 만들어주고 다음과 같이 코딩한다.


import java.io.IOException;

import java.io.InputStream;

import java.util.StringTokenizer;


public class StreamSayHelloEventHandler implements EventHandler

{

   private static final int DATA_SIZE = 512;

   private static final int TOKEN_NUM = 2;

@Override

public String getHandler() 

{

   return "0x5001";

}

public void handleEvent(InputStream inputStream)

{

try

{

   byte[] buffer = new byte[DATA_SIZE];

   inputStream.read(buffer);

   String data = new String(buffer);

   String[] params = new String[TOKEN_NUM];

   StringTokenizer token = new StringTokenizer(data, "|");

   int i = 0;

 while(token.hasMoreTokens())

{

   params[i] = token.nextToken();

   ++i;

}

   sayHello(params);

}

catch (IOException e)

{

   e.printStackTrace();

}

}


8. StreamUpdateProfileEventHandler class를 생성하고 다음과 같이 코딩한다.


import java.io.IOException;

import java.io.InputStream;

import java.util.StringTokenizer;


public class StreamUpdateProfileEventHandler implements EventHandler

{

private static final int DATA_SIZE = 1024;

private static final int TOKEN_NUM = 5;

@Override

public String getHandler() 

{

return "0x6001";

}

public void handleEvent(InputStream inputStream)

{

try

{

byte[] buffer = new byte[DATA_SIZE];

inputStream.read(buffer);

String data = new String(buffer);

String[] params = new String[TOKEN_NUM];

StringTokenizer token = new StringTokenizer(data, "|");

int i = 0;

while(token.hasMoreTokens())

{

params[i] = token.nextToken();

++i;

}

updateProfile(params);

}

catch (IOException e)

{

e.printStackTrace();

}

private void updateProfile(String[] params)

{

System.out.println("UpdateProfile ->" +

            " id :" + params[0] +

            " password : " + params[1] +

            " name : " + params[2] +

            " age : " + params[3] + 

            " gender: " + params[4]);

}

}

private void sayHello(String[] params)

{

   System.out.println("SayHello -> name: " + params[0] + "age : " + params[1]);

}

}


9. TestClient class를 생성하고 다음과 같이 코딩해준다.


import java.io.IOException;

import java.io.OutputStream;

import java.net.Socket;

import java.net.UnknownHostException;


public class TestClient {


public static void main(String[] args

{

   System.out.println("Client ON");

   try

   {

      String message;

      Socket socket = new Socket("127.0.0.1", 5000);

      OutputStream out = socket.getOutputStream();

      message = "0x5001|이건주|26";

      out.write(message.getBytes());

      socket.close();


      Socket socket2 = new Socket("127.0.0.1", 5000);

      OutputStream out2 = socket2.getOutputStream();

      message = "0x6001|Lee|1234|이건주|26|남성";

      out2.write(message.getBytes());

      socket2.close();

    } 

    catch (UnknownHostException e)

    {

        e.printStackTrace();

    } 

    catch (IOException e)

   {

       e.printStackTrace();

   }

}

}


10. ThreadPerDispatcher class를 생성하고 다음과 같이 코딩한다.

import java.io.IOException;

import java.net.ServerSocket;

import java.net.Socket;


public class ThreadPerDispatcher implements Dispatcher

{

  public void dispatch(ServerSocket serverSocket, HandleMap handleMap)

{

while(true)

{

try

{

   Socket socket = serverSocket.accept();

   Runnable demultiplexer = new Demultiplexer(socket, handleMap);

   Thread thread = new Thread(demultiplexer);

   thread.start();

}

catch (IOException e)

{

   e.printStackTrace();

}

}

}

}


11. ThreadPoolDispatcher class를 생성하고 다음과 같이 코딩한다.


import java.io.IOException;

import java.net.ServerSocket;

import java.net.Socket;


public class ThreadPoolDispatcher implements Dispatcher

{

static final String NUMTHREADS = "8";

static final String THREADPROP = "Threads";

private int numThreads;

public ThreadPoolDispatcher()

{

   numThreads = Integer.parseInt(System.getProperty(THREADPROP, NUMTHREADS));

}

public void dispatch(final ServerSocket serverSocket, final HandleMap handleMap)

{

for(int i = 0; i < (numThreads - 1); i++)

{

   Thread thread = new Thread()

{

public void run()

{

   dispatchLoop(serverSocket, handleMap);

}

};

  thread.start();

  System.out.println("Created and started Thread = " + thread.getName());

}

System.out.println("Iterative server starting in main thread" +

        Thread.currentThread().getName());

dispatchLoop(serverSocket, handleMap);

}

private void dispatchLoop(ServerSocket serverSocket, HandleMap handleMap

{

while(true)

{

try

{

   Socket socket = serverSocket.accept();

   Runnable demultiplexer = new Demultiplexer(socket, handleMap);

   demultiplexer.run();

}

catch (IOException e)

{

   e.printStackTrace();

}

}

}

}


12. 정상적으로 동작하면 다음과 같이 동작되어야 한다. 동시Pool의 갯수만큼의 스레드로 동시에 처리하고 나머지 접속에 대해서는 처리를 미룬다.




반응형
LIST

'Server > Reactor' 카테고리의 다른 글

Reactor의 기능  (0) 2016.08.10
블로그 이미지

만년필석사

,