EchoService.h
#pragma  once
 
 #include  <ace/Asynch_IO.h>
 #include  <ace/Message_Block.h>
 #include  <ace/Message_Queue.h>
 #include  <ace/Synch.h>
 #include  <ace/Proactor.h>
 #include  <ace/SOCK_Stream.h>
 
 class EchoService:public ACE_Handler
 {
 public:
     EchoService(void);
     ~EchoService(void);
 
    void open(ACE_Proactor* actor, const ACE_HANDLE& handle);
    int send( const  char* data);
 
 
    virtual  void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
    virtual  void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
 
 private:
     ACE_SOCK_Stream stream_;
 
     ACE_Asynch_Read_Stream read_require_;
     ACE_Message_Block read_msg_;
 
     ACE_Asynch_Write_Stream write_require_;
     ACE_Message_Queue<ACE_MT_SYNCH> write_msg_queue_;
     ACE_Atomic_Op<ACE_Thread_Mutex, bool> writable_;
 
 
 private:
    void recv_();
 
    void close();
 };
 
 
EchoService.cpp
#include  "EchoService.h"
 #include  <iostream>
 
 EchoService::EchoService(void)
     :read_msg_(1024),writable_(false),stream_(ACE_INVALID_HANDLE)
 {
 }
 
 
 EchoService::~EchoService(void)
 {
    this->close();
 }
 
 
 void  
 EchoService::open(ACE_Proactor* actor, const  ACE_HANDLE& handle){
    this->proactor(actor);
    this->stream_.set_handle(handle);
 
    this->read_require_.open(*this, stream_.get_handle(), 0, this->proactor());
    this->write_require_.open(*this, stream_.get_handle(), 0, this->proactor());
 
    this->writable_ = true;
    this->recv_();
 
    if( this->write_msg_queue_.is_empty()) return;
    ACE_Message_Block* m;
    this->write_msg_queue_.dequeue_head(m);
    this->write_require_.write(*m, m->length());
    this->writable_ = false;
 }
 
 void  
 EchoService::close(){
    this->read_require_.cancel();
    this->write_require_.cancel();
    this->stream_.close();
    this->stream_.set_handle( ACE_INVALID_HANDLE);
    this->writable_ = false;
 }
 
 void  
 EchoService::recv_(){
     read_msg_.reset();
    this->read_require_.read( read_msg_, read_msg_.space() - 1);
 }
 
 void  
 EchoService::handle_read_stream (const  ACE_Asynch_Read_Stream::Result &result){
    if( !result.success()){
       this->close();
        std::cout<<std::endl<<"Error:"<<result.error();
       return;
    }
 
    if( 0==result.message_block().length()){
       this->close();
        std::cout<<std::endl<<"Disconnected!!";
       return;
    }
 
    result.message_block().copy("");
     std::cout<<std::endl<<"RECV:"<<(const  char*) result.message_block().rd_ptr();
    this->recv_();
 
 }
 
 int  
 EchoService::send( const  char* data){
    if( ACE_INVALID_HANDLE== stream_.get_handle()){
       return -1;
    }
 
    size_t len = ACE_OS::strlen( data )+1;
    ACE_Message_Block* m;
    ACE_NEW_RETURN(m, ACE_Message_Block(len), -1);
     m->copy( data);
 
    if( this->writable_.value() ){
       this->writable_ = false;
       return  this->write_require_.write(*m, m->length());
    }
 
    return  this->write_msg_queue_.enqueue_tail(m);
 }
 
 void  
 EchoService::handle_write_stream (const  ACE_Asynch_Write_Stream::Result &result){
    if( !result.success()){
       this->write_msg_queue_.enqueue_head(&result.message_block());
       this->close();
        std::cout<<std::endl<<"Error:"<<result.error();
       return;
    }
 
    if( 0==result.message_block().length()){
       result.message_block().release();
 
       if( this->write_msg_queue_.is_empty()){
          this->writable_ = true;
          return;
       }
 
       ACE_Message_Block* m;
       this->write_msg_queue_.dequeue_head(m);
       this->write_require_.write(*m, m->length());
       return;