간단히 Echo 클라이언트 만들기


ACE_Reactor는 ACE_Event_Handler의 인스턴스를 관리합니다. ACE_Reactor::register_handler 함수로 관심있는 이벤트 타입을 설정합니다. ACE_Reactor::remove_handler 함수로 기존 등록된 이벤트 타입을 제거하도록 요청할 수 있습니다.

ACE_SOCK_Connector로 서버에 접속이 가능합니다.접속시 사용되는 ACE_SOCK_Stream 객체에 이벤트를 다음과 같이 등록가능합니다.

ACE_Time_Value vt(0,0);
connector_.connect(peer_,addr,&vt);
this->reactor()->register_handler(this,ACE_Event_Handler::CONNECT_MASK);

시간을 {0,0}으로 설정하고 호출하면 비동기 접속이 가능합니다. 또한 CONNECT_MASK로 이벤트를 등록하면 됩니다. 이렇게 등록하면 3개의 이벤트 핸들러가 호출됩니다.

virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE);
virtual int handle_exception (ACE_HANDLE fd = ACE_INVALID_HANDLE);

handle_input과 handle_exception는 서버 접속 실패시, handle_output는 서버 접속 성공시 호출됩니다.
서버 접속시 성공한 이벤트인 handle_output 이벤트 핸들러에서는 connector_.complete 함수를 호출해서 최종적으로 완료한 ACE_SOCK_STREAM에 대한 정보를 업데이트해야 ACE_Reactor에 이벤트를 등록할 수 있습니다.

서비스 객체는 스택에, 접속 객체는 힙에 동적으로 생성되고, 본연의 일이 마무리되면 힙에서 자동적으로 소멸되도록 설계했습니다.

서비스 객체를 힙에 생성하는 경우에는 CONNECTION MAP과 동기화 객체를 사용해서 제어해야 소멸된 객체를 참조하지 않도록 설계해야 합니다. CONNECTION MAP는 특정 아이디로 서비스 객체가 있는지를 확인할 수 있습니다. 문제는 서비스 객체를 사용하는 중에, 서비스 객체가 소멸하는 경우입니다. 이런 경우는 동기화 객체로 소멸 작업을 늦추도록 해야 합니다. 자세한 부분은 차후에 설명하도록 하겠습니다.

EchoService.h
#pragma  once
#include  <ace/Event_Handler.h>
#include  <ace/Reactor.h>
#include  <ace/SOCK_Stream.h>
#include  <ace/Message_Queue.h>
#include  <ace/Synch.h>

class  EchoService:public  ACE_Event_Handler
{
public:
   enum  CSTATE{
      C_INIT,
      C_CONNECTING,
      C_FAIL,
      C_SUCCESS
   };

private:
   ACE_SOCK_Stream               peer_;
   CSTATE                     state_;
   ACE_Message_Queue<ACE_MT_SYNCH> send_datas_;

public:
    EchoService(ACE_Reactor* reactor);
    ~EchoService(void);

   void state(CSTATE state);
   CSTATE state() const;

   ACE_SOCK_Stream& peer();
   virtual  ACE_HANDLE get_handle (void) const;
   virtual  int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
   virtual  int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
   virtual  int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE);

   int send(void* data, ssize_t len);
};

EchoService.cpp
#include  "EchoService.h"
#include  <iostream>
#include  <ace/OS.h>

EchoService::EchoService(ACE_Reactor* reactor)
    :ACE_Event_Handler(reactor),peer_(ACE_INVALID_HANDLE),state_(C_INIT)
{
}


EchoService::~EchoService(void)
{
}

void 
EchoService::state(CSTATE  state){
   this->state_ = state;
   if( C_SUCCESS!=this->state_) return;
   this->reactor()->register_handler( this, ACE_Event_Handler::READ_MASK);
}

EchoService::CSTATE 
EchoService::state() const{
   return  this->state_;
}

ACE_SOCK_Stream&
EchoService::peer(){
   return  this->peer_;
}

ACE_HANDLE 
EchoService::get_handle (void) const{
   return  this->peer_.get_handle();
}

int 
EchoService::handle_input (ACE_HANDLE  fd/* = ACE_INVALID_HANDLE*/){
   const  int BUF =1024;
   unsigned  char in[BUF];

   ACE_Time_Value rt(0,0);

   ssize_t len = this->peer_.recv(in,BUF-1,&rt);
   if( len<=0){// 서버  종료  또는  통신  에러
      return -1;
   }

    in[len]=NULL;
    std::cout<<std::endl<<"Server Data:"<<in<<std::endl;
   return 0;
}

int 
EchoService::handle_close (ACE_HANDLE  handle,ACE_Reactor_Mask  close_mask){
    std::cout<<"Disconntected"<<std::endl;
   ACE_Reactor_Mask m = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;
   this->reactor()->remove_handler( this, m);

   this->peer_.close();
   this->peer_.set_handle(ACE_INVALID_HANDLE);
   this->state(C_INIT);
   this->send_datas_.flush();
   return 0;
}

int 
EchoService::send(void* data, ssize_t  len){
   if(C_SUCCESS!= this->state_) return -1;

   ACE_Message_Block* mb = new  ACE_Message_Block(len);
    ACE_OS::memcpy( mb->wr_ptr(), data, len);
    mb->wr_ptr(len);
   this->send_datas_.enqueue_tail( mb );
   this->reactor()->register_handler( this, ACE_Event_Handler::WRITE_MASK);
   this->reactor()->register_handler( this, ACE_Event_Handler::READ_MASK);
   return 0;
}

int 
EchoService::handle_output (ACE_HANDLE  fd/* = ACE_INVALID_HANDLE*/){
   ACE_Message_Block* mb(NULL);
   ACE_Time_Value rt(0,0);

   do {
      this->send_datas_.dequeue_head(mb,&rt);

      ssize_t len = this->peer_.send( mb->rd_ptr(), mb->length(),&rt);
      if( len < 0){//통신  에러  발생
          mb->release();
         return -1;
      }

       mb->rd_ptr(len);
      if( mb->length()>0){
         this->send_datas_.enqueue_head(mb);
         break;
      }

       mb->release();

   }while(false);

   if( this->send_datas_.is_empty()){
      ACE_Reactor_Mask m = ACE_Event_Handler::WRITE_MASK | ACE_Event_Handler::DONT_CALL;
      this->reactor()->remove_handler(this, m);
   }

   return 0;
}

GConnector.h
#pragma  once

#include  <ace/Event_Handler.h>
#include  <ace/Reactor.h>
#include  <ace/SOCK_Connector.h>
#include  "EchoService.h"

class  GConnector:public  ACE_Event_Handler
{
private:
   ACE_SOCK_Connector connector_;
   EchoService&    stream_;
public:
    GConnector(const  char* ipstr,ACE_Reactor* reactor, EchoService& stream);
    ~GConnector(void);

   virtual  ACE_HANDLE get_handle (void) const;
   //접속  이벤트
   virtual  int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
   virtual  int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE);
   virtual  int handle_exception (ACE_HANDLE fd = ACE_INVALID_HANDLE);

   virtual  int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
};

GConnector.cpp
#include  "GConnector.h"
#include  <ace/INET_Addr.h>
#include  <iostream>

GConnector::GConnector(const  char* ipstr,ACE_Reactor* reactor, EchoService& stream)
    :ACE_Event_Handler(reactor),stream_(stream)
{
   ACE_INET_Addr addr(ipstr);
   ACE_Time_Value rt(0,0);

    connector_.connect( stream_.peer(),addr,&rt);
    stream_.state( EchoService::C_CONNECTING);

   this->reactor()->register_handler(this,ACE_Event_Handler::CONNECT_MASK);
   
}


GConnector::~GConnector(void)
{
}


ACE_HANDLE 
GConnector::get_handle (void) const{
   return stream_.peer().get_handle();
}

int 
GConnector::handle_input(ACE_HANDLE  fd/* = ACE_INVALID_HANDLE*/){
   //접속  실패
    std::cout<<std::endl<<"Connect fail"<<std::endl;
    stream_.state( EchoService::C_FAIL);
   return -1;
}

int 
GConnector::handle_output(ACE_HANDLE  fd/* = ACE_INVALID_HANDLE*/){
   //접속  성공
    std::cout<<std::endl<<"Connect success"<<std::endl;
   this->connector_.complete(stream_.peer());
    stream_.state( EchoService::C_SUCCESS);

   return -1;
}

int 
GConnector::handle_exception(ACE_HANDLE  fd/* = ACE_INVALID_HANDLE*/){
   // 접속  실패
   return handle_input(fd);
}

int 
GConnector::handle_close (ACE_HANDLE  handle,ACE_Reactor_Mask  close_mask){
   ACE_Reactor_Mask m =ACE_Event_Handler::ALL_EVENTS_MASK|ACE_Event_Handler::DONT_CALL;
   this->reactor()->remove_handler(this, m);
   delete  this;
   return 0;
}

EchoClient.cpp
// EchoClient.cpp : Defines the entry point for the console application.
//

#include  "stdafx.h"

#include  <ace/ACE.h>
#include  <ace/Select_Reactor.h>
#include  <ace/Reactor.h>
#include  "GConnector.h"
#include  "EchoService.h"
#include  <thread>
#include  <string>
#include  <ace/OS.h>
#include  <iostream>

void fn(ACE_Reactor* app){
   app->owner( ACE_OS::thr_self());
   app->run_reactor_event_loop();
}

int  _tmain(int  argc, _TCHAR* argv[])
{
    ACE::init();

   ACE_Select_Reactor sr;
   ACE_Reactor reactor(&sr);

   EchoService stream(&reactor);

   new  GConnector("192.168.0.2:1000",&reactor, stream);

    std::thread t1(fn,&reactor);

    std::string end("EXIT");
    std::string cmd;

   do {
       std::getline( std::cin,cmd);
      if( end == cmd){
          reactor.end_reactor_event_loop();
         break;
      }

       stream.send( (void*)cmd.c_str(), strlen( cmd.c_str() ) );

   } while( true);
   
    t1.join();

    ACE::fini();
   return 0;
}

- 목록:

21 thoughts on “ACE Proactor Framework 10 – Acceptor-Connector 예제 2

  1. 시리얼 통신강의를 빼고 모든 강의를 전부 시청하였습니다.
    이렇게 좋은 강의 자료를 만들어 주셔서 감사합니다.
    상용화 해도 손색이 없겠네요^^

  2. 훌륭한 강의에 감사드립니다.
    예제를 따라하다가 ACE::init()과 ACE::fini() 를 헤더에서 찾을 수 없다는 에러를 내었습니다.
    저는 ACE-6.3.1을 사용하였는데, ACE-6.1.9 버전으로 테스트 해 보니 또 잘 되더라구요.
    6.3.1 에서는 include 아래에 를 추가해서 문제를 해결했습니다.
    무슨 연유에서인지 저는 ACE.h 가 Init_ACE.h 를 포함하지 않았나봅니다.
    혹시 같은 고민을 하시는 저 같은 초보자가 있을까 하여 로그를 남깁니다.

    • 제가 아는 선에서는 예전 5 버전대에서는 ace/ace.h 에 초기화 로직이 있었던걸로 기억합니다.
      그런데 6점대로 넘어가면서 ace/ace.h -> ace/ACE.h 등으로 세분화? 되면서
      ACE::init(), ACE::fini()가 ace/Init_ACE.h로 나뉜것 같더라고요.

  3. 안녕하세요. 질문을 드리고자합니다.^^
    제가 ace를 이용한 네트워크 프로그램하나를 인수받았는데요.
    클라이언트를 62개 이상 받아들이면 오류가 나는겁니다. 그래서 찾아보니
    ACE_Reator 를 사용하여 개발된 중계서버 문제. 수백 또는 수천개의 핸들을 다중 수신하도록 설정이 가능한 ACE_Select_Reator, ACE_TP_Reator와는 달리, ACE_WFMO_Reator는 62개 이상의 핸들은 처리할 수 없다. 이 제한은 Windows에서는 단지 WaitForMultipleObject()함수의 스레드당 대기 가능 핸들 개수가 64개라는 사실때문이다.

    ACE_WFMO_Reator는 내부적으로 64개의 핸들중 2개를 별도로 사용하기 때문에 62개의 핸들만 사용가능하고, Windows에서 ACE_Reator를 사용시에는 디폴트로 ACE_WFMO_Reator가 Base 이다.

    -이 증상을 해결하려면 ACE_Proactor를 사용하거나, ACE_Select_Reator를 사용해야한다.

    이렇게 나와있더라구요. 저는 ace를 이용해본적이 없어서 급한마음에 책을 보고 ACE_Select_Reator로 대처를 하려고 하는데요.

    m_pAcceptor = new ClientAcceptor;
    m_pAcceptor->reactor( ACE_Reactor::instance() );

    이부분을
    ACE_Select_Reactor sr;
    m_reactor = new ACE_Reactor(&sr);// ACE_Reactor* m_reactor;
    m_pAcceptor->reactor( ACE_Reactor::instance(m_reactor) );
    이렇게 수정하니 62개이상의 클라이언트 오류는 사라졌지만..

    m_reactor->reset_reactor_event_loop();
    m_reactor->run_reactor_event_loop();

    이부분에서 죽더라구요 ㅜㅜ

    ace 강의를 심도있게 들은 후 공부좀 해서 하고싶지만 시간이 없는 관계상 이렇게 질문드려봅니다.
    어떤 다른 부분을 수정해야 하는지 궁금합니다.
    이렇게 질문부터 드려서 죄송합니다.

    • 일단 기존 64개 이상 핸들링 문제는 잘 해결하신 듯하군요.

      전체 코딩을 보지 않아서 정확하게 답변드리긴 힘들어보이는군요.

      일단 reset_reactor_event_loop를 호출하는 이유는 파악하셔야 할듯 합니다. 일반적으로 run_reactor_event_loop만 호출만 되는데 .. 말이죠.

      일단 reset_reactor_event_loop를 호출하면 sr이 비활성화 상태로 들어갑니다. 비활성화 상태가 되면, 자신이 관리하던 핸들러에 대해 소거 작업이 이후로 진행될 터인데, 이 과정에서 오류가 발생하는 듯하군요.

      결국 reset_reactor_event_loop를 호출해야하는 이유를 파악하셔야 할듯합니다. 어지간해서는 호출할 일이 없을 듯한데.

      • 아.. 답변 감사합니다~ 꾸벅~
        알고보니 제가 기초적인 실수를 했습니다. ㅎㅎ
        m_reactor->reset_reactor_event_loop();
        m_reactor->run_reactor_event_loop();
        이부분이 svc() 스레드에서 돌아가는것인데..

        ACE_Select_Reactor sr;
        m_reactor = new ACE_Reactor(&sr);// ACE_Reactor* m_reactor;
        이부분은 스레드 돌아가기전 제가 만든 init()함수에서 작성한 코드라..
        레퍼런스로 넘긴 sr의 핸들이 소멸되버려서 발생한 런타임 purecall()오류인것이였습니다.

        그래서..
        m_tp = new ACE_TP_Reactor;
        m_reactor = new ACE_Reactor(m_tp);
        m_pAcceptor->reactor(m_reactor);

        멤버변수로 생성하여..

        m_reactor->reset_reactor_event_loop();
        m_reactor->run_reactor_event_loop();
        이렇게 하니 아주 잘됩니다. ㅎㅎ
        헬프 답변 너무 감사드립니다.

        올려주신 강의를 하나 보았는데 정말 감명받았습니다.ㅜㅜ
        나머지 강의들도 보면서 모두 만들어 보려합니다.
        정말 존경스럽습니다.
        다시한번 감사드립니다. (–)(__)

  4. 이런 질문을 드려도 될지 모르겠는데…

    Service Configurator Framework 2 – ACE_Service_Config 클래스 이해 에서
    remove Commander 로 자기자신을 죽일 때

    if( 0 == in[len-1]){
    std::cout<<std::endl<<"User Command:"<<msg_.rd_ptr()<< std::endl;
    ACE_Service_Config::process_directive(msg_.rd_ptr());
    std::cout<<"after process_directive"<<msg_.rd_ptr()<<std::endl;
    msg_.length(0);
    msg_.crunch();
    }

    process_directive 를 거치면서, 프로그램이 죽네요.. 디버깅을 타보니, msg_.rd_ptr() 이 directive[] 로 연결이 되는데, 이 포인터가 의미가 없는 값이 들어있네요.. Bad_ptr 로…
    제 생각으로는, Commander 가 죽으면서, Bad_ptr 이 되어버린거 같은데…
    어떻게 해결할 수 있을지, 감이 잘 안옵니다… ㅠㅠ;

    • fini() 까지는 호출이 되고 나서 죽습니다…
      ServiceLoader 에서 스트링을 참조하려다가 죽는듯한데…
      (글 수정이 안되는게 좀 불편하네요~ㅎㅎ, 강의, 감사하게 보고 있습니다.)

      • svc.conf 파일이 정상적인지 일단 확인하셔야 할 듯합니다.
        단순 문자열 관련 에러라면, 조금만 노력하시면 에러를 잡아낼 수 있을 겁니다.
        조금 더 자세한 정보를 제공해준다면, 조금 더 도움이 되는 답변이 가능합니다.

      • 저도 같은 문제거 같습니다. fini()까지 호출되고 main()에서 error가 나서 죽습니다.
        혹시 알고 계시면 알려주세요~~ 감사합니다. 강의 감사 드립니다.

  5. 안녕하십니까.
    이런질문도 드려도 될지…

    제가 window mfc dialog 기반에서 Proactor을 이용해 서버를 만들려고 하는데

    ACE_Asynch_Acceptor의 open 함수를 호출하면 정상적으로 동작은 되는데, 종료시(런타임시..)
    memory leak이 발생합니다.

    mfc 가 아닌 win32 console 로 프로젝트를 생성하면 memory leak이 발생하지 않습니다.
    혹시 이 문제에대한 해결방법이 있을까요? 도움이 필요합니다?

    감사합니다.

    • 해당 momory leak 현상은 proactor thread 종료된 후
      accept close가 호출됨으로 cancel io 대한 처리 부분이 에러가 발생한 경우일듯 합니다.
      (close도 요청되는 요청 작업이고, 해당하는 응답 처리가 되어야 하지만, 이미 쓰레드가 죽었음으로 이런 경우엔 불가능합니다.)

      따라서 accept close 후 proactor thread 종료로 변경시 해당 leak은 발생하지 않을 듯 합니다.

  6. 윈도우OS 에서 사용할 ACE 공유 메모리 class 가 있을까요? ACE_Shared_Memory_SV 는 open 에러가 나네요..

Leave a Reply to gfcpt Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>