간단히 Echo 서버 만들어보기


ACE_Reactor는 ACE_Event_Handler의 인스턴스를 관리합니다. ACE_Reactor::register_handler 함수로 관심있는 이벤트 타입을 설정합니다. ACE_Reactor::remove_handler 함수로 기존 등록된 이벤트 타입을 제거하도록 요청할 수 있습니다.

handle_input 함수는 새로운 사용자가 접속하거나, 새로운 데이터 받았거나, 사용자가 접속을 끊으면 호출됩니다.

네크웍 서버는 기본적으로 ACE_SOCK_Acceptor로 사용자 접속을 기다립니다. ACE_SOCK_Acceptor::open 함수로 특정 IP와 포트로 바인딩이 가능합니다. 기다리던 ACE_SOCK_Acceptor를 ACE_Reactor에 ACE_Event_Handler::ACCEPT_MASK로 이벤트를 등록하면 handle_input으로 이벤트가 콜백됩니다.

사용자 접속이 이루어지면 그제서야 새로운 ACE_Event_Handler로부터 상속받은 서비스 객체를 생성하고 접속을 허가하고 생성된 서비스 객체 역시 기존 ACE_Reactor 객체에 이벤트 타입을 등록해서 받길 원하는 이벤트를 설정합니다.

handle_input에서 recv로 읽은 데이터의 길이가 0보다 같거나 작으면, return -1를 해서 접속을 끊어주어야 합니다. 0 이하의 값은 사용자가 접속을 끊었거나 ACE_SOCK_Stream 객체에 알수 없는 에러가 발생한 경우입니다.

EchoService.h
#pragma  once

#include  <ace/Event_Handler.h>
#include  <ace/Reactor.h>
#include  <ace/SOCK_Acceptor.h>
#include  <ace/SOCK_Stream.h>
#include  <ace/INET_Addr.h>
#include  <ace/Message_Queue.h>
#include  <ace/Synch.h>


class EchoService:public ACE_Event_Handler
{
private:
    ACE_SOCK_Stream                   peer_;
    ACE_INET_Addr                   addr_;
    ACE_Message_Queue<ACE_MT_SYNCH>       send_datas_;


public:
    EchoService(ACE_SOCK_Acceptor& acceptor, ACE_Reactor* reactor);
    ~EchoService(void);

   virtual ACE_HANDLE get_handle (void) const;
   virtual  int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
   virtual  int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
   virtual  int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE);
};

EchoService.cpp
#include  "EchoService.h"

#include  <iostream>
#include  <ace/OS.h>

EchoService::EchoService(ACE_SOCK_Acceptor& acceptor, ACE_Reactor* reactor)
    :ACE_Event_Handler(reactor),peer_(ACE_INVALID_HANDLE)
{
   if( -1 == acceptor.accept(peer_,&addr_) ){
      delete  this;
      return;
   }

   char addr_string[256];
    addr_.addr_to_string(addr_string, 256);
    std::cout<<std::endl<<"Enter Client:"<<addr_string<<std::endl;

   this->reactor()->register_handler( this,ACE_Event_Handler::READ_MASK);
}


EchoService::~EchoService(void)
{
}


ACE_HANDLE
EchoService::get_handle (void) const{
   return  this->peer_.get_handle();
}

int 
EchoService::handle_input (ACE_HANDLE fd/* = ACE_INVALID_HANDLE*/){
   const  int BUF =1024;
   unsigned  char in[BUF];
   
    ACE_Time_Value vt(0,0);

    ssize_t len =this->peer_.recv(in, BUF,&vt);
   if( len <= 0){//클라이언트  종료. 통신  에러
      return -1;
   }

    ACE_Message_Block* mb = new ACE_Message_Block(len);
    ACE_OS::memcpy( mb->wr_ptr(), in, len);
    mb->wr_ptr(len);
    send_datas_.enqueue_tail( mb);

   this->reactor()->register_handler( this, ACE_Event_Handler::WRITE_MASK);
   return 0;
}

int 
EchoService::handle_output (ACE_HANDLE fd/* = ACE_INVALID_HANDLE*/){
    ACE_Message_Block* mb(NULL);
    ACE_Time_Value rt(0,0);
   do{
      this->send_datas_.dequeue_head(mb,&rt);

       ssize_t len=this->peer_.send(mb->rd_ptr(), mb->length(),&rt);
      if( len <0){//통신  에러
          mb->release();
         return -1;
      }

       mb->rd_ptr(len);
      if( mb->length()>0){
         this->send_datas_.enqueue_head( mb );
         break;
      }
       mb->release();
   }while(false);

   if( this->send_datas_.is_empty()){
       ACE_Reactor_Mask m = ACE_Event_Handler::WRITE_MASK | ACE_Event_Handler::DONT_CALL;
      this->reactor()->remove_handler(this, m);
   }
   return 0;
}

int 
EchoService::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask){
   char addr_string[256];
    addr_.addr_to_string(addr_string, 256);
    std::cout<<std::endl<<"Exit Client:"<<addr_string<<std::endl;


    ACE_Reactor_Mask m = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;
   this->reactor()->remove_handler(this,m);
   this->send_datas_.flush();
   this->peer_.close();
   delete  this;
   return 0;
}

GAcceptor.h
#pragma  once

#include  <ace/Event_Handler.h>
#include  <ace/Reactor.h>
#include  <ace/SOCK_Acceptor.h>


class  GAcceptor:public  ACE_Event_Handler
{
private:
   ACE_SOCK_Acceptor acceptor_;

public:
    GAcceptor(const  char* ipstr,ACE_Reactor* reactor);
    ~GAcceptor(void);

   virtual  ACE_HANDLE get_handle (void) const;
   virtual  int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
   virtual  int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
};

GAcceptor.cpp
#include  "GAcceptor.h"

#include  <ace/INET_Addr.h>
#include  <iostream>
#include  "EchoService.h"

GAcceptor::GAcceptor(const  char* ipstr,ACE_Reactor* reactor)
    :ACE_Event_Handler(reactor)
{
   ACE_INET_Addr addr(ipstr);
   
   int bret = acceptor_.open(addr,1);
   if( -1==bret){
       std::cout<<std::endl<<"Listen fail:"<<ipstr<<std::endl;
      delete  this;
      return;
   }

    std::cout<<std::endl<<"Server start:"<<ipstr<<std::endl;

   this->reactor()->register_handler( this, ACE_Event_Handler::ACCEPT_MASK);
}


GAcceptor::~GAcceptor(void)
{
}


ACE_HANDLE 
GAcceptor::get_handle (void) const{
   return  this->acceptor_.get_handle();
}

int 
GAcceptor::handle_input (ACE_HANDLE  fd/* = ACE_INVALID_HANDLE*/){
   new  EchoService(this->acceptor_,this->reactor());
   return 0;
}

int 
GAcceptor::handle_close (ACE_HANDLE  handle,ACE_Reactor_Mask  close_mask){
   ACE_Reactor_Mask m = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;
   this->reactor()->remove_handler( this, m);
   this->acceptor_.close();
   delete  this;
   return 0;
}

Quit.h
#pragma  once
#include  <ace/Reactor.h>
#include  <ace/Event_Handler.h>

class  Quit:public  ACE_Event_Handler
{
public:
    Quit(ACE_Reactor* reactor);
    ~Quit(void);

   virtual  int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
   virtual  int handle_exception (ACE_HANDLE fd = ACE_INVALID_HANDLE);
   virtual  int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
};

Quit.cpp
#include  "Quit.h"
#include  <ace/OS.h>

Quit::Quit(ACE_Reactor* reactor)
    :ACE_Event_Handler(reactor)
{
   this->reactor()->register_handler(SIGBREAK, this);
}


Quit::~Quit(void)
{
}


int 
Quit::handle_signal (int signum, siginfo_t * /*= 0*/, ucontext_t * /*= 0*/){
   this->reactor()->notify(this);
    ACE_OS::sleep(2);
   return 0;
}

int 
Quit::handle_exception (ACE_HANDLE fd/* = ACE_INVALID_HANDLE*/){
   this->reactor()->end_reactor_event_loop();
   return -1;
}

int 
Quit::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask){
    ACE_Reactor_Mask m = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;
   this->reactor()->remove_handler( this, m);
   delete  this;
   return 0;
}

EchoServer.cpp
// EchoServer.cpp : Defines the entry point for the console application.
//

#include  "stdafx.h"

#include  <ace/ACE.h>
#include  "GAcceptor.h"
#include  "Quit.h"

#include  <ace/TP_Reactor.h>
#include  <ace/Reactor.h>

#include  <thread>

void fn(ACE_Reactor* app){
   app->run_reactor_event_loop();
}

int  _tmain(int  argc, _TCHAR* argv[])
{
    ACE::init();

   ACE_TP_Reactor tp;
   ACE_Reactor reactor(&tp);

   new  Quit(&reactor);
   new  GAcceptor("192.168.0.2:1000", &reactor);
   new  GAcceptor("192.168.0.2:5500",&reactor);

    std::thread t1( fn, &reactor);
    std::thread t2( fn, &reactor);

    t1.join();
    t2.join();

    ACE::fini();
   return 0;
}

- 목록:

18 thoughts on “ACE Proactor Framework 10 – Acceptor-Connector 예제 2

  1. 시리얼 통신강의를 빼고 모든 강의를 전부 시청하였습니다.
    이렇게 좋은 강의 자료를 만들어 주셔서 감사합니다.
    상용화 해도 손색이 없겠네요^^

  2. 훌륭한 강의에 감사드립니다.
    예제를 따라하다가 ACE::init()과 ACE::fini() 를 헤더에서 찾을 수 없다는 에러를 내었습니다.
    저는 ACE-6.3.1을 사용하였는데, ACE-6.1.9 버전으로 테스트 해 보니 또 잘 되더라구요.
    6.3.1 에서는 include 아래에 를 추가해서 문제를 해결했습니다.
    무슨 연유에서인지 저는 ACE.h 가 Init_ACE.h 를 포함하지 않았나봅니다.
    혹시 같은 고민을 하시는 저 같은 초보자가 있을까 하여 로그를 남깁니다.

    • 제가 아는 선에서는 예전 5 버전대에서는 ace/ace.h 에 초기화 로직이 있었던걸로 기억합니다.
      그런데 6점대로 넘어가면서 ace/ace.h -> ace/ACE.h 등으로 세분화? 되면서
      ACE::init(), ACE::fini()가 ace/Init_ACE.h로 나뉜것 같더라고요.

  3. 안녕하세요. 질문을 드리고자합니다.^^
    제가 ace를 이용한 네트워크 프로그램하나를 인수받았는데요.
    클라이언트를 62개 이상 받아들이면 오류가 나는겁니다. 그래서 찾아보니
    ACE_Reator 를 사용하여 개발된 중계서버 문제. 수백 또는 수천개의 핸들을 다중 수신하도록 설정이 가능한 ACE_Select_Reator, ACE_TP_Reator와는 달리, ACE_WFMO_Reator는 62개 이상의 핸들은 처리할 수 없다. 이 제한은 Windows에서는 단지 WaitForMultipleObject()함수의 스레드당 대기 가능 핸들 개수가 64개라는 사실때문이다.

    ACE_WFMO_Reator는 내부적으로 64개의 핸들중 2개를 별도로 사용하기 때문에 62개의 핸들만 사용가능하고, Windows에서 ACE_Reator를 사용시에는 디폴트로 ACE_WFMO_Reator가 Base 이다.

    -이 증상을 해결하려면 ACE_Proactor를 사용하거나, ACE_Select_Reator를 사용해야한다.

    이렇게 나와있더라구요. 저는 ace를 이용해본적이 없어서 급한마음에 책을 보고 ACE_Select_Reator로 대처를 하려고 하는데요.

    m_pAcceptor = new ClientAcceptor;
    m_pAcceptor->reactor( ACE_Reactor::instance() );

    이부분을
    ACE_Select_Reactor sr;
    m_reactor = new ACE_Reactor(&sr);// ACE_Reactor* m_reactor;
    m_pAcceptor->reactor( ACE_Reactor::instance(m_reactor) );
    이렇게 수정하니 62개이상의 클라이언트 오류는 사라졌지만..

    m_reactor->reset_reactor_event_loop();
    m_reactor->run_reactor_event_loop();

    이부분에서 죽더라구요 ㅜㅜ

    ace 강의를 심도있게 들은 후 공부좀 해서 하고싶지만 시간이 없는 관계상 이렇게 질문드려봅니다.
    어떤 다른 부분을 수정해야 하는지 궁금합니다.
    이렇게 질문부터 드려서 죄송합니다.

    • 일단 기존 64개 이상 핸들링 문제는 잘 해결하신 듯하군요.

      전체 코딩을 보지 않아서 정확하게 답변드리긴 힘들어보이는군요.

      일단 reset_reactor_event_loop를 호출하는 이유는 파악하셔야 할듯 합니다. 일반적으로 run_reactor_event_loop만 호출만 되는데 .. 말이죠.

      일단 reset_reactor_event_loop를 호출하면 sr이 비활성화 상태로 들어갑니다. 비활성화 상태가 되면, 자신이 관리하던 핸들러에 대해 소거 작업이 이후로 진행될 터인데, 이 과정에서 오류가 발생하는 듯하군요.

      결국 reset_reactor_event_loop를 호출해야하는 이유를 파악하셔야 할듯합니다. 어지간해서는 호출할 일이 없을 듯한데.

      • 아.. 답변 감사합니다~ 꾸벅~
        알고보니 제가 기초적인 실수를 했습니다. ㅎㅎ
        m_reactor->reset_reactor_event_loop();
        m_reactor->run_reactor_event_loop();
        이부분이 svc() 스레드에서 돌아가는것인데..

        ACE_Select_Reactor sr;
        m_reactor = new ACE_Reactor(&sr);// ACE_Reactor* m_reactor;
        이부분은 스레드 돌아가기전 제가 만든 init()함수에서 작성한 코드라..
        레퍼런스로 넘긴 sr의 핸들이 소멸되버려서 발생한 런타임 purecall()오류인것이였습니다.

        그래서..
        m_tp = new ACE_TP_Reactor;
        m_reactor = new ACE_Reactor(m_tp);
        m_pAcceptor->reactor(m_reactor);

        멤버변수로 생성하여..

        m_reactor->reset_reactor_event_loop();
        m_reactor->run_reactor_event_loop();
        이렇게 하니 아주 잘됩니다. ㅎㅎ
        헬프 답변 너무 감사드립니다.

        올려주신 강의를 하나 보았는데 정말 감명받았습니다.ㅜㅜ
        나머지 강의들도 보면서 모두 만들어 보려합니다.
        정말 존경스럽습니다.
        다시한번 감사드립니다. (–)(__)

  4. 이런 질문을 드려도 될지 모르겠는데…

    Service Configurator Framework 2 – ACE_Service_Config 클래스 이해 에서
    remove Commander 로 자기자신을 죽일 때

    if( 0 == in[len-1]){
    std::cout<<std::endl<<"User Command:"<<msg_.rd_ptr()<< std::endl;
    ACE_Service_Config::process_directive(msg_.rd_ptr());
    std::cout<<"after process_directive"<<msg_.rd_ptr()<<std::endl;
    msg_.length(0);
    msg_.crunch();
    }

    process_directive 를 거치면서, 프로그램이 죽네요.. 디버깅을 타보니, msg_.rd_ptr() 이 directive[] 로 연결이 되는데, 이 포인터가 의미가 없는 값이 들어있네요.. Bad_ptr 로…
    제 생각으로는, Commander 가 죽으면서, Bad_ptr 이 되어버린거 같은데…
    어떻게 해결할 수 있을지, 감이 잘 안옵니다… ㅠㅠ;

    • fini() 까지는 호출이 되고 나서 죽습니다…
      ServiceLoader 에서 스트링을 참조하려다가 죽는듯한데…
      (글 수정이 안되는게 좀 불편하네요~ㅎㅎ, 강의, 감사하게 보고 있습니다.)

      • svc.conf 파일이 정상적인지 일단 확인하셔야 할 듯합니다.
        단순 문자열 관련 에러라면, 조금만 노력하시면 에러를 잡아낼 수 있을 겁니다.
        조금 더 자세한 정보를 제공해준다면, 조금 더 도움이 되는 답변이 가능합니다.

      • 저도 같은 문제거 같습니다. fini()까지 호출되고 main()에서 error가 나서 죽습니다.
        혹시 알고 계시면 알려주세요~~ 감사합니다. 강의 감사 드립니다.

  5. 안녕하십니까.
    이런질문도 드려도 될지…

    제가 window mfc dialog 기반에서 Proactor을 이용해 서버를 만들려고 하는데

    ACE_Asynch_Acceptor의 open 함수를 호출하면 정상적으로 동작은 되는데, 종료시(런타임시..)
    memory leak이 발생합니다.

    mfc 가 아닌 win32 console 로 프로젝트를 생성하면 memory leak이 발생하지 않습니다.
    혹시 이 문제에대한 해결방법이 있을까요? 도움이 필요합니다?

    감사합니다.

    • 해당 momory leak 현상은 proactor thread 종료된 후
      accept close가 호출됨으로 cancel io 대한 처리 부분이 에러가 발생한 경우일듯 합니다.
      (close도 요청되는 요청 작업이고, 해당하는 응답 처리가 되어야 하지만, 이미 쓰레드가 죽었음으로 이런 경우엔 불가능합니다.)

      따라서 accept close 후 proactor thread 종료로 변경시 해당 leak은 발생하지 않을 듯 합니다.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>