반응형
SMALL

'Server/Reactor'에 해당되는 글 2건

반응형
LIST

Reactor Thread

Server/Reactor 2016. 8. 11. 00:20
반응형
SMALL

이번 시간엔 Reactor Thread에 대해 포스팅 해보려고 한다. 서버는 한번에 한가지 일만을 처리하지 않는다. 동시다발적으로 여러가지 프로토콜을 처리하는것이 서버이다. 이렇게 동시에 처리 할 수 있도록 해주는 것이 Thread기능이다. 하지만 Thread도 적당히 있으면 좋은데 이것도 너무 많아지게 되면 갑자기 처리량이 증가해서 서버는 당연히 느려지게 된다. 그러다보면 과부하가 발생하는것은 기본이다. 그러다보면 메모리에도 부담이 가게되고 여러모로 악재가 겹친다. 그래서 사용되는 것이 Thread Pool기능이다. 접속이 들어오기 전에 미리 스레드를 만들어서 스레드를 생성하는 지연을 줄이고 스레드의 갯수를 제한하고 다 쓴 스레드를 재활용한다. 



스레드풀을 만드는 방법은 다음과 같다.


1. Demultiplexer class파일을 만들고 다음과같이 코딩한다.


import java.io.IOException;

import java.io.InputStream;

import java.net.Socket;


public class Demultiplexer implements Runnable

{

   private final int HEADER_SIZE = 6;

   private Socket socket;

   private HandleMap handleMap;

public Demultiplexer(Socket socket, HandleMap handleMap)

{

    this.socket = socket;

    this.handleMap = handleMap;

}


public void run()

{

try

{

   InputStream inputStream = socket.getInputStream();

   byte[] buffer = new byte[HEADER_SIZE];

   inputStream.read(buffer);

   String header = new String(buffer);

   handleMap.get(header).hanedleEvent(inputStream);

   socket.close();

}

catch (IOException e)

{

    e.printStackTrace();

   }

  }

}


2. Dispatcher interface를 만들어주고 다음과 같이 코딩한다.


import java.net.ServerSocket;


public interface Dispatcher 

{

   public void dispatch(ServerSocket serverSocket, HandleMap handler);


}


3. EventHandler interface를 만들어주고 다음과 같이 코딩한다.

import java.io.InputStream;


public interface EventHandler 

{

    public String getHandler();

    public void handleEvent(InputStream inputStream);


}


4.  HandleMap class를 만들고 다음과 같이 코딩한다.


import java.util.HashMap;


public class HandleMap extends HashMap<String, EventHandler> 

{


}


5. Reactor class를 만들어주고 다음과 같이 코딩한다.


import java.io.IOException;

import java.net.ServerSocket;


public class Reactor 

{

   private ServerSocket serverSocket;

   private HandleMap handleMap;

public Reactor(int port)

{

    handleMap = new HandleMap();

try

{

    serverSocket = new ServerSocket(port);

}

catch (IOException e)

{

   e.printStackTrace();

}

}

public void startServer() 

{

   Dispatcher dispatcher = new ThreadPoolDispatcher();

   dispatcher.dispatch(serverSocket, handleMap);

}

public void registerHandler(EventHandler handler)

{

   handleMap.put(handler.getHandler(), handler);

}

public void removeHandler(EventHandler handler)

{

   handleMap.remove(handler.getHandler());

}

}


6. Serverinitializer class를 만들어주고 다음과 같이 코딩한다.

import java.io.IOException;

import java.net.ServerSocket;



public class Serverinitializer 

{


public static void main(String[] args

{

  int port = 5000;

  System.out.println("Server ON :" + port);

  Reactor reactor = new Reactor(port);

  reactor.registerHandler(new StreamSayHelloEventHandler());

  reactor.registerHandler(new StreamUpdateProfileEventHandler());

  reactor.startServer();

}

}


7. StreamSayHelloEventHandler class를 만들어주고 다음과 같이 코딩한다.


import java.io.IOException;

import java.io.InputStream;

import java.util.StringTokenizer;


public class StreamSayHelloEventHandler implements EventHandler

{

   private static final int DATA_SIZE = 512;

   private static final int TOKEN_NUM = 2;

@Override

public String getHandler() 

{

   return "0x5001";

}

public void handleEvent(InputStream inputStream)

{

try

{

   byte[] buffer = new byte[DATA_SIZE];

   inputStream.read(buffer);

   String data = new String(buffer);

   String[] params = new String[TOKEN_NUM];

   StringTokenizer token = new StringTokenizer(data, "|");

   int i = 0;

 while(token.hasMoreTokens())

{

   params[i] = token.nextToken();

   ++i;

}

   sayHello(params);

}

catch (IOException e)

{

   e.printStackTrace();

}

}


8. StreamUpdateProfileEventHandler class를 생성하고 다음과 같이 코딩한다.


import java.io.IOException;

import java.io.InputStream;

import java.util.StringTokenizer;


public class StreamUpdateProfileEventHandler implements EventHandler

{

private static final int DATA_SIZE = 1024;

private static final int TOKEN_NUM = 5;

@Override

public String getHandler() 

{

return "0x6001";

}

public void handleEvent(InputStream inputStream)

{

try

{

byte[] buffer = new byte[DATA_SIZE];

inputStream.read(buffer);

String data = new String(buffer);

String[] params = new String[TOKEN_NUM];

StringTokenizer token = new StringTokenizer(data, "|");

int i = 0;

while(token.hasMoreTokens())

{

params[i] = token.nextToken();

++i;

}

updateProfile(params);

}

catch (IOException e)

{

e.printStackTrace();

}

private void updateProfile(String[] params)

{

System.out.println("UpdateProfile ->" +

            " id :" + params[0] +

            " password : " + params[1] +

            " name : " + params[2] +

            " age : " + params[3] + 

            " gender: " + params[4]);

}

}

private void sayHello(String[] params)

{

   System.out.println("SayHello -> name: " + params[0] + "age : " + params[1]);

}

}


9. TestClient class를 생성하고 다음과 같이 코딩해준다.


import java.io.IOException;

import java.io.OutputStream;

import java.net.Socket;

import java.net.UnknownHostException;


public class TestClient {


public static void main(String[] args

{

   System.out.println("Client ON");

   try

   {

      String message;

      Socket socket = new Socket("127.0.0.1", 5000);

      OutputStream out = socket.getOutputStream();

      message = "0x5001|이건주|26";

      out.write(message.getBytes());

      socket.close();


      Socket socket2 = new Socket("127.0.0.1", 5000);

      OutputStream out2 = socket2.getOutputStream();

      message = "0x6001|Lee|1234|이건주|26|남성";

      out2.write(message.getBytes());

      socket2.close();

    } 

    catch (UnknownHostException e)

    {

        e.printStackTrace();

    } 

    catch (IOException e)

   {

       e.printStackTrace();

   }

}

}


10. ThreadPerDispatcher class를 생성하고 다음과 같이 코딩한다.

import java.io.IOException;

import java.net.ServerSocket;

import java.net.Socket;


public class ThreadPerDispatcher implements Dispatcher

{

  public void dispatch(ServerSocket serverSocket, HandleMap handleMap)

{

while(true)

{

try

{

   Socket socket = serverSocket.accept();

   Runnable demultiplexer = new Demultiplexer(socket, handleMap);

   Thread thread = new Thread(demultiplexer);

   thread.start();

}

catch (IOException e)

{

   e.printStackTrace();

}

}

}

}


11. ThreadPoolDispatcher class를 생성하고 다음과 같이 코딩한다.


import java.io.IOException;

import java.net.ServerSocket;

import java.net.Socket;


public class ThreadPoolDispatcher implements Dispatcher

{

static final String NUMTHREADS = "8";

static final String THREADPROP = "Threads";

private int numThreads;

public ThreadPoolDispatcher()

{

   numThreads = Integer.parseInt(System.getProperty(THREADPROP, NUMTHREADS));

}

public void dispatch(final ServerSocket serverSocket, final HandleMap handleMap)

{

for(int i = 0; i < (numThreads - 1); i++)

{

   Thread thread = new Thread()

{

public void run()

{

   dispatchLoop(serverSocket, handleMap);

}

};

  thread.start();

  System.out.println("Created and started Thread = " + thread.getName());

}

System.out.println("Iterative server starting in main thread" +

        Thread.currentThread().getName());

dispatchLoop(serverSocket, handleMap);

}

private void dispatchLoop(ServerSocket serverSocket, HandleMap handleMap

{

while(true)

{

try

{

   Socket socket = serverSocket.accept();

   Runnable demultiplexer = new Demultiplexer(socket, handleMap);

   demultiplexer.run();

}

catch (IOException e)

{

   e.printStackTrace();

}

}

}

}


12. 정상적으로 동작하면 다음과 같이 동작되어야 한다. 동시Pool의 갯수만큼의 스레드로 동시에 처리하고 나머지 접속에 대해서는 처리를 미룬다.




반응형
LIST

'Server > Reactor' 카테고리의 다른 글

Reactor의 기능  (0) 2016.08.10
블로그 이미지

만년필석사

,

Reactor의 기능

Server/Reactor 2016. 8. 10. 12:23
반응형
SMALL

이번시간엔 Reactor에 대해 포스팅해보려고 한다. 앞서 포스팅에서 Protocol을 2개로 만들어서 서버를 만들어 봤었다. 하지만 문제가 되는게 또다른 프로토콜을 추가하게 되면 또 클래스를 정의하고 스위치문에다 포트번호를 넣어주고 일일히 그렇게 다 만들어 줘야한다. 아래와 같은 그림처럼;;



이렇게 해서 어느세월에 프로토콜을 정의 하려 하는지... 그래서 필요한 것이 바로 Reaactor 기능이다. 



Reactor의 구조를 대강 살펴보면.... 아래 그림과 같다.








- Reactor Pattern을 사용하여 프로토콜 추가에 유연한 서버를 만든다.

- Map을 이용하여 편하게 클래스 관리를 해본다.


이것이 이번 포스팅의 목표다!! 리엑터기능을 이용한 프로토콜 정의하기!!

만드는 법은 아래와 같다.


1. EventHandler라는 interface를 생성해주고 다음과 같이 코딩한다.

import java.io.InputStream;


public interface EventHandler 

{

   public String getHandler();

   public void handleEvent(InputStream inputStream);


}


2.  HandleMap이라는 클래스를 정의해주고 다음과 같이 코딩한다.

import java.util.HashMap;


public class HandleMap extends HashMap<String, EventHandler>  ->HandleMap이라고 하는 HashMap을 상속

{


}


3. 기존에 있었던 StreamSayHelloProtocol을 StreamSayHelloEventHandler로 리액터 시켜 이름을 바꿔주고 다음과 같이 코딩해준다.

import java.io.IOException;

import java.io.InputStream;

import java.util.StringTokenizer;


public class StreamSayHelloEventHandler implements EventHandler -> 상속받은것

{

private static final int DATA_SIZE = 512;

private static final int TOKEN_NUM = 2;

@Override

public String getHandler() 

{

   return "0x5001";

}

public void handleEvent(InputStream inputStream)

{

try

{

byte[] buffer = new byte[DATA_SIZE];

inputStream.read(buffer);

String data = new String(buffer);

String[] params = new String[TOKEN_NUM];

StringTokenizer token = new StringTokenizer(data, "|");

int i = 0;

while(token.hasMoreTokens())

{

     params[i] = token.nextToken();

     ++i;

}

sayHello(params);

}

catch (IOException e)

{

   e.printStackTrace();

}

}

private void sayHello(String[] params)

{

     System.out.println("SayHello -> name: " + params[0] + "age : " + params[1]);

}

}


4. StreamUpdateProfileProtocol도 리액터 시켜 StreamUpdateProfileEventHandler로 이름을 바꿔주고 다음과 같이 코딩해준다.

import java.io.IOException;

import java.io.InputStream;

import java.util.StringTokenizer;


public class StreamUpdateProfileEventHandler implements EventHandler

{

  private static final int DATA_SIZE = 1024;

  private static final int TOKEN_NUM = 5;

  @Override

  public String getHandler() 

{

   return "0x6001";

}

public void handleEvent(InputStream inputStream)

{

try

{

byte[] buffer = new byte[DATA_SIZE];

inputStream.read(buffer);

String data = new String(buffer);

String[] params = new String[TOKEN_NUM];

StringTokenizer token = new StringTokenizer(data, "|");

int i = 0;

while(token.hasMoreTokens())

{

   params[i] = token.nextToken();

   ++i;

}

updateProfile(params);

}

catch (IOException e)

{

   e.printStackTrace();

}

}

private void updateProfile(String[] params)

{

System.out.println("UpdateProfile ->" +

            " id :" + params[0] +

            " password : " + params[1] +

            " name : " + params[2] +

            " age : " + params[3] + 

            " gender: " + params[4]);

}

}


5. Reactor 클래스를 만들어주고 다음과 같이 코딩해준다.

import java.io.IOException;

import java.net.ServerSocket;


public class Reactor 

{

private ServerSocket serverSocket;

private HandleMap handleMap;  ->리액터에서 관리할 HandleMap 등록

public Reactor(int port)

{

   handleMap = new HandleMap();  ->리액터에서 관리할 HandleMap등록

try

{

   serverSocket = new ServerSocket(port);

}

catch (IOException e)

{

   e.printStackTrace();

}

}

public void startServer() 

{

  Dispatcher dispatcher = new Dispatcher();

while(true)

{

  dispatcher.dispatch(serverSocket, handleMap);

}

}

public void registerHandler(EventHandler handler)

{

  handleMap.put(handler.getHandler(), handler); ->핸들러의 헤더등록, 핸들러등록

}

public void removeHandler(EventHandler handler)

{

  handleMap.remove(handler.getHandler()); ->핸들러의 헤더등록, 핸들러등록

}

}


6. Serverinitializer로 돌아와 리액터를 등록해 준다.

import java.io.IOException;

import java.net.ServerSocket;



public class Serverinitializer 

{


public static void main(String[] args

{

  int port = 5000;

  System.out.println("Server ON :" + port);

  Reactor reactor = new Reactor(port);

  reactor.registerHandler(new StreamSayHelloEventHandler());

  reactor.registerHandler(new StreamUpdateProfileEventHandler());

  reactor.startServer();

}

}


7. Dispacher 클래스를 만들고 다음과 같이 코딩해준다.

import java.io.IOException;

import java.io.InputStream;

import java.net.ServerSocket;

import java.net.Socket;


public class Dispatcher

{

private final int HEADER_SIZE = 6;

public void dispatch(ServerSocket serverSocket, HandleMap handleMap)

{

try

{

Socket socket = serverSocket.accept();

demultiplex(socket, handleMap);

}

catch (IOException e)

{

e.printStackTrace();

}

}

public void demultiplex(Socket socket, HandleMap handleMap)

{

try

{

InputStream inputStream = socket.getInputStream();

byte[] buffer = new byte[HEADER_SIZE];

inputStream.read(buffer);

String header = new String(buffer);

handleMap.get(header).handleEvent(inputStream);

}

catch(IOException e)

{

e.printStackTrace();

}

}

}


8. 기존에 만들어 놨던 TestClient를 실행해보면....


다음과 같이 그 전에 만들어 놨던 것과 같이 잘 실행된다는 걸 볼 수 있다. 이젠 만들어논 클래스 파일들을 핸들러맵으로 상속시켜서 리액터 시켜놓으면 이상한 번거로움 없이 잘 실행 시킬 수 있다는 것을 보여주고 있다.




반응형
LIST

'Server > Reactor' 카테고리의 다른 글

Reactor Thread  (0) 2016.08.11
블로그 이미지

만년필석사

,