Thursday, April 14, 2011

Project Idea

I want to (re)create a project I did in my time at AOL. It was a data pipeline for collecting system stats. The pipeline is an Agent per host to collect the stats, a Message Routing infrastructure, and a data storage end point. Currently my Idea is to do all this in Ruby, 0MQ, and MySQL. Maybe later I will keep the timeseries data stored in something other than MySQL.

Mostly I want to play with 0MQ.

Sunday, April 3, 2011

Detecting remote connection closed

Normally you see read(2) return 0 bytes and errno set to ECONNRESET (on MacOS X) to detect that the peer of the socket closed the socket. But I have not figured out how EventMachine notifies the user of a peer disconnect.

EM (EventMachine) calls unbind on a socket close. But that is for a client close via close_connection or a peer close with no means to detect the difference.

Saturday, April 2, 2011

Simple Chat Server

Uses EM::Channel to communicate between two Server ports.

A simple `nc localhost 7000` to talk to the server

Learned a bunch of little things. More about transliterating things from Perl to Ruby.

Spent too much time trying to get /^(\w+)(?:\s+(\w+))*/ to do what I thought it would do. Doesn't even do what I think in perl. Of course the obvious str.split(' ') didn't work till I "gave up" and thought of a programatic way to do that hypothetical regex.

Then "chan_sid = ..." didn't work, but "self.chan_sid = ..." did. I have to figure that out.

#!/usr/bin/env ruby -w

require 'rubygems'
require 'eventmachine'

class CmdExecutor < EM::Connection
  include EM::P::LineText2
  attr_reader   :port
  attr_reader   :cmd_prompt
  attr_reader   :rsp_prefix
  attr_reader   :greetings
  attr_reader   :chan
  attr_accessor :chan_sid
  def initialize port, chan, *args
    p port, chan, args
    @port = port
    @chan = chan
    @cmd_prompt = "[#{port}]> "
    @rsp_prefix = "[#{port}]# "
    @greetings = "Hello how may I be of assistance?"
    console "Setting port=#{@port}"
    console "Setting prompt=#{@cmd_prompt}"
    console "#{self.class} initialized"
  end
  def console *args
    puts "[#{port}] #{args.join(' ')}"
  end
  def send_line line
    send_data(line+"\n")
  end
  def post_init
    console "\"post_init\" for port=#{port}"
    self.chan_sid = chan.subscribe { |msg| receive_chan msg }
    console "channel sid = ##{chan_sid}"
    send_line greetings
    send_data cmd_prompt
  end
  def receive_chan msg
    (port, cmd, *args) = *msg;
    console "FROM CHANNEL(#{chan_sid}): #{cmd} #{args.join ' '}"
    send_line rsp_prefix + "#{cmd} #{args.join ' '}"
  end
  def receive_line line
    (cmd, *args) = line.split(' ')
    cmd = cmd.downcase
    console "RECIEVED CMD: #{cmd} #{args.join(' ')}"
    case cmd
    when "close"
      console "\"close\" command called"
      console "close_connection will be issued"
      send_line rsp_prefix + cmd
      close_connection_after_writing
    when "quit"
      console "\"quit\" command called"
      console "EM.stop will be issued"
      send_line rsp_prefix + cmd
      EM.next_tick { EM.stop }
    else
      chan.push([port, cmd, *args])
      console "sent to chan (#{chan_sid}) \"#{cmd} #{args.join ' '}\""
      send_data cmd_prompt
    end #case data
  end #end def recieve_data
  def unbind
    console "connection closed"
    chan.unsubscribe(chan_sid)
  end
  #private
end


EM.run {
  chan = EM::Channel.new
  EM.start_server("0.0.0.0", 7000, CmdExecutor, 7000, chan)
  EM.start_server("0.0.0.0", 7001, CmdExecutor, 7001, chan)
}