이 글은 개인 공부 목적으로 작성된 포스팅입니다. 왜곡된 내용이 포함되어 있습니다.
Java IO
자바에서는 IO를 스트림으로 처리한다.
OutputStream outputStream = socket.getOutputStream();
InputStream inputStream = socket.getInputStream();
read() 와 같은 블로킹 메서드를 통해 읽기, 쓰기를 처리한다.
TCP 통신의 경우 ServerSocket(환영 소캣)에서 Accept한 socket을 클라이언트에게 전달하여 socket을 통해 클라이언트와 서버간 데이터를 주고 받는다. 이때 소캣에 있는 스트림을 반환 받아 입출력을 처리하게 된다.
이러한 IO처리 방식에는 몇가지 문제가 있다.
1. Blocking IO
blocking IO 이기 때문에 blocking되는 동안 다른 처리를 할 수 없다. 따라서 여러 클라이언트가 있는 상황이라면 클라이언트마다 blocking IO에 대한 별도의 쓰레드 처리가 필요 하다.
2. thread
1 으로 인해 thread가 무분별하게 증가하게 된다. 클라이언트 갯수만큼 쓰레드가 생기게 된다.
자바 NIO를 살펴보면 위 문제점을 해결할 수 있는 지 살펴보자
NIO
자바 NIO는 "Java new IO" 이다. 새로운 IO라는 것인데 크게 3가지의 개념이 도입된다.
- 버퍼
- 채널
- 셀렉터
이외에도 NIO에서 지원하는 기능들이 더 있는 것으로 알고 있는데 위 3개를 중심으로 NIO를 이해하고자 한다.
Buffer
NIO는 버퍼를 사용한다. 버퍼는 이미 사용중인거 아닌가?? 맞다. 버퍼는 IO에서 이미 사용중이다. 그러면 기존에서 IO에서 사용하는 버퍼를 생각해보자
IO는 시스템콜을 호출해야 한다. 커널에서는 시스템 콜을 통해 하드웨어에게 읽기, 쓰기 연산을 요청하게 된다.
그림에서는 DMA을 통해 하드웨어의 입출력을 지원하고 있지만, 다른 형태일 수도 있다.(지금 생각해볼 영역은 아니다.)
하드웨어에게 받은 데이터를 1차적으로 커널 영역의 버퍼에 저장되고, 커널영역의 버퍼가 유저모드의 버퍼로 복사되면서 IO가 종료되게 된다.
- 여러번의 IO가 발생한다고 가정해보자 IO마다 커널로의 복사과정이 진행된다. 만약 IO처리를 하나의 버퍼에서 처리해서 읽기 또는 쓰기를 한번에 처리한다면 시간을 줄일 수 있지 않을까?
- 유저모드의 버퍼와 커널 모드의 버퍼가 같은 주소를 공유한다면 복사 횟수를 줄일 수 있지 않을까?
자바NIO에서는 Buffer 클래스를 제공한다.
public abstract class Buffer {
// Cached unsafe-access object
static final Unsafe UNSAFE = Unsafe.getUnsafe();
static final ScopedMemoryAccess SCOPED_MEMORY_ACCESS = ScopedMemoryAccess.getScopedMemoryAccess();
/**
* The characteristics of Spliterators that traverse and split elements
* maintained in Buffers.
*/
static final int SPLITERATOR_CHARACTERISTICS =
Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED;
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
//중략
}
버퍼에서는 4개의 필드가 있다.
position: 현재 버퍼가 읽어야하는 인덱스, 읽기 쓰기 연산 후 +1 된다. (3이라면 세번째 인덱스를 읽어야한다.)
capacity: 버퍼의 크기이다. 초과될 수 없다
limit: 사용자가 지정한 버퍼의 유효범위 이다. capacity를 초과할 수 없다.
mark: 저장한 위치이다. mark() 함수 호출시 position값을 저장한다. 스택 프레임처럼 생각하면 된다.
Buffer 클래스에서 제공하는 함수는 다음과 같다.
public final int position(); //getter
public final int limit(); //getter
public final int capacity(); //getter
public final Buffer position(int newPosition); //setter
public final Buffer limit(int newLimit); //setter
public final Buffer mark(); //setter
public final Buffer reset(); //mark position으로 setter
public final int remaining(); //남은 공간 return(limit-position)
public final boolean hasRemaining(); //remain의 boolean property
public abstract boolean isReadOnly();
중략
Buffer 클래스는 추상클래스이다. Buffer 구현체는 총 3개가 있다.
- HeapByteBuffer: 기본적인 Buffer 구현체이다. heap 영역에 buffer 인스턴스를 할당하는 방식으로 일반적인 객체 생성과 거이 동일하다
- DirectByteBuffer: Buffer를 커널 영역의 메모리를 사용하는 Buffer 구현체이다. GC의 영역이 아니기 때문에 직접 해제해야한다.
- MappedByteBuffer: 파일을 메모리에 맵핑하는 Buffer 구현체이다. Memory Mapped IO이다.
DirectByteBuffer
메모리는 실제 주소가 아닌 가상 메모리의 형태로 사용된다. 가상 메모리는 페이지 테이블을 통해 물리 메모리 주소로 변환된다.
따라서 서로 다른 가상 메모리 주소가 동일한 물리메모리 주소를 참조할 수 도 있다.
커널의 메모리를 참조하는 가상 메모리를 DirectByteBuffer에 할당할 것이다. 그러나 커널의 메모리를 참조하고 있더라고 직접 접근하는 것은 결국 시스템콜을 요구할 것이다.
자바 NIO에서는 네이티브 메서드라는 C로 작성된 메서드를 사용하여 커널 메모리와 상호작용한다. JVM을 공부하면 등장하는 네이티브 메서드가 여기에 사용되었던 것이다!
네이티브 메서드의 구현은 정확하게 알 수 없지만, 적어도 기존 io작업보다는 성능이 적어도 동일하거나 더 빠르다고 짐작해볼 수 있다.
private ByteBuffer getArray(int index, byte[] dst, int offset, int length) {
if (
((long)length << 0) > Bits.JNI_COPY_TO_ARRAY_THRESHOLD) {
long bufAddr = address + ((long)index << 0);
long dstOffset =
ARRAY_BASE_OFFSET + ((long)offset << 0);
long len = (long)length << 0;
try {
SCOPED_MEMORY_ACCESS.copyMemory(
scope(), null, base(), bufAddr,
dst, dstOffset, len);
} finally {
Reference.reachabilityFence(this);
}
} else {
int end = offset + length;
for (int i = offset, j = index; i < end; i++, j++) {
dst[i] = get(j);
}
}
return this;
}
위 코드는 버퍼 get()함수 호출시 내부에서 호출되는 함수로 주목해야하는 부분은 copyMemory() 메소드를 호출하고 있는데 네이티브 메서드이다.
Buffer (네이티브 메서드)
버퍼는 배열이다. 배열의 데이터를 읽을 수도 있고, 쓸 수도 있다.
버퍼 메소드로 slice(), flip()과 같이 다양한 메소드가 있다.
ByteBuffer buffer=ByteBuffer.allocate(10);
buffer.put((byte) 0).put((byte) 1).put((byte) 2).put((byte) 3).put((byte) 4)
.put((byte) 5).put((byte) 6).put((byte) 7).put((byte) 8).put((byte) 9);
buffer.position(3);
buffer.limit(9);
ByteBuffer copyBuffer=buffer.slice();
System.out.println("position: " + copyBuffer.position() + ", limit: " + copyBuffer.limit() +
", capacity: " + copyBuffer.capacity());
while(copyBuffer.hasRemaining()){
System.out.print(copyBuffer.get()+" ");
}
System.out.println();
buffer.put(3,(byte)10);
System.out.println(buffer.get(3));
System.out.println(copyBuffer.get(0));
buffer.put(4,(byte)11);
System.out.println(buffer.get(4));
System.out.println(copyBuffer.get(1));
slice()를 통해 버퍼를 앝은복사 하는 예제이다. 하나의 버퍼에서 position, limit를 controll 하여 논리적으로 격리된 새로운 버퍼를 생성하는 등 하나의 버퍼(데이터 영역)을 재사용할 수 있다. 기존에는 커널영역에서 데이터를 받기만 하였다면 Application layer에 로직을 추가하여 데이터를 원하는 형태로 핸들링 할 수 있다고 이해해도 무방하다.
채널 (Scatter, Gather, lock)
NIO는 채널을 사용한다. 채널은 스트림과 유사하게 데이터를 전달하는 인터페이스이다. NIO에는 버퍼를 사용하여 데이터를 송수신 한다. 버퍼를 사용한다는 것은 네이티브 IO를 사용하겠다는 것이다. 성능 상 이점이 있다. 또한 기존 스트림에서는 InputStream, OutputStream으로 입출력을 분리했다면 채널은 하나의 버퍼에서 입출력을 모두 처리한다.(당연히 Lock을 사용해야하고, 구현상 복잡성이 증가한다.)
public interface Channel extends CLoseable{
public boolean isOpen();
public void close() throw IOExceptions;
}
채널은 인터페이스로 2가지의 메서드만을 지원한다.
채널은 Scatter, Gather를 지원한다.
/**
* Writes a sequence of bytes to this channel from a subsequence of the
* given buffers.
*
* <p> Bytes are written starting at this channel's current file position
* unless the channel is in append mode, in which case the position is
* first advanced to the end of the file. The file is grown, if necessary,
* to accommodate the written bytes, and then the file position is updated
* with the number of bytes actually written. Otherwise this method
* behaves exactly as specified in the {@link GatheringByteChannel}
* interface. </p>
*/
public abstract long write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
채널 구현체인 GatheringByteChannel의 write 메서드로 ByteBuffer 배열에 있는 데이터를 한번에 쓰기 연산을 수행한다.(이전에 Buffer 클래스는 유연한 데이터 입출력 연산이 가능함을 기억하자)
FileOutputStream fo = new FileOutputStream("src/channel/input.txt");
GatheringByteChannel channel = fo.getChannel();
ByteBuffer header=ByteBuffer.allocateDirect(20);
ByteBuffer body=ByteBuffer.allocateDirect(40);
ByteBuffer[] buffers={header,body};
header.put("Hello ".getBytes());
body.put("World!".getBytes());
header.flip();
body.flip();
channel.write(buffers);
channel.close();
GatherTest 코드로 쓰기 연산을 한번에 처리할 수 있다. Buffer + 네이티브 IO으로 가능하다.
채널을 통해 우리는 IO연산을 줄일 수 있었다. 그렇다면 이제 불필요한 쓰레드 생성만 줄일 수 있다면 기존 IO의 문제점을 해결할 수 있다. 셀렉터를 소개하기 전에 사전 지식으로 필요한 몇가지가 있다.
Reactor 패턴
https://en.wikipedia.org/wiki/Reactor_pattern
리액터 패턴은 디자인 패턴으로 이벤트 처리 전략을 채택한 패턴이다.
요청에 대한 이벤트를 이벤트 큐에 넣어두고, 이벤트 큐를 처리하는 쓰레드가 이벤트를 하나 씩 꺼내서 프로세스 또는 쓰레드에게 이벤트 처리를 요청하는 형태이다. 리액터 패턴을 채택하고 있는 기술들이 꽤 있다.
비동기 처리
이때까지는 Blocking IO를 처리했지만 Non Blocking IO는 어떻게 처리될까? (비동기, 논 블락킹에 대한 구분은 여기서 이야기하지 않고, 동일하게 간주한다)
- Polling 방식: CPU가 주기적으로 상태를 확인하는 방식이다.
- Interrupt 방식: 장치가 직접 종료 신호를 보내는 방식이다.
Selector
셀렉터는 논 블로킹 모드를 사용하여 하나의 쓰레드 만으로 여러 IO 연산을 지원한다. IO 멀티플렉싱 이라고 한다.
Selector selector=Selector.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(portNumber));
SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
셀렉터에 채널을 등록하여 사용할 수 있다. 채널을 등록하기 위해 register() 메소드를 사용한다. 이때 SelectionKey를 지정해야한다.
SelectionKeyOP
셀렉션키op는 셀렉터에 등록할 수 있는 이벤트종류이다.
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
총 4가지 모드가 있다. 쉬프트 연산에서 알 수 있듯이, AND, OR 연산이 가능하다.
Select()
셀렉터에서는 select() 메소드를 호출하여 IO 멀티플렉싱을 처리한다. 순서는 다음과 같다.
- 취소키 집합(cancelled-key set)에 있는 key를 3가지 집합에 모두 제거한다..(registered-key set, selected-key set, cancelled-key set)
- 등록키 집합(registered-key set)에 있는 key에 대해 이벤트가 발생한 key의 경우 선택키 집합(selected-key set)에 추가하고, 이미 있다면 업데이트 한다.(이벤트가 발생한 key가 적어도 하나 있을때까지 block됩다 or wake up)
- 선택키 집합(selected-key set)에 있는 key에 매핑된 Channel에 대한 처리후 선택키 집합에서 제외한다
while(true){
if(selector.select()==0){
continue;
}
for(SelectionKey key : selector.selectedKeys()){
if(key.isAcceptable()){
if(key.channel() instanceof ServerSocketChannel channel){
SocketChannel client = channel.accept();
Socket socket = client.socket();
System.out.println(socket.getInetAddress().getHostAddress()+" : "+socket.getPort()+" connect ");
client.configureBlocking(false);
client.register(selector,SelectionKey.OP_READ);
clients.add(client);
}
}
else if(key.isReadable()){
if(key.channel() instanceof SocketChannel client) {
int read = client.read(buffer);
if(read==-1){
Socket socket = client.socket();
System.out.println(socket.getInetAddress().getHostAddress()+" : "+socket.getPort()+" connect ");
System.out.println("Disconnected");
client.close();
clients.remove(client);
}
buffer.flip();
String data = new String(buffer.array(), buffer.position(), read);
System.out.println("data : "+data);
buffer.putInt(read).flip();
//중략
}
}
}
select()의 호출과 별개로 selector.selectedKeys() 을 호출하여 SelectionKey 집합을 받아 처리해야한다.
selector.selectedKeys()는 이벤트가 발생한 키 집합으로 이벤트 발생 유무는 select() 함수 호출시 비동기 polling으로 이벤트를 확인한다. SelectionKey에는 매핑된 채널, op를 모두 확인할 수 있다. SelectionKey에 해당하는 채널의 이벤트 처리후 해당 SelectionKey를 remove 해야한다.
고민 했던 내용
- 왜 셀렉터의 select 함수 호출후 사용가능한 채널을 받는게 아니라 갯수를 받아 selectKeys를 한번더 찾아야하는가 -> select 함수는 IO 연산이 준비된 채널을 기다리는 용도이다. 준비된 채널은 SelectKeys에 추가되도록 내부적을 구현되어 있기 때문에 준비된 채널이 있으면 채널을 받아 따로 받아 처리해야한다.
- 왜 이벤트 처리 후, key를 삭제해야 하느가 -> key를 삭제하는 것은 영구적을 삭제하는 것이 아닌 SelectKeys 집합에서 삭제하겠다는 것이다. 모든 Key는 registerdKeys에 있다.
'Java' 카테고리의 다른 글
[이펙티브 자바] 아이템 64, 65, 66, 67, 6 (0) | 2024.06.18 |
---|---|
[이펙티브 자바] 아이템 54, 55 (0) | 2024.05.20 |
List.toArray() (0) | 2024.05.19 |
[이펙티브 자바] 아이템 47, 48, 49 (2) | 2024.04.27 |
[이펙티브 자바] 아이템 39, 40, 41 (1) | 2024.03.24 |