class Plugin::Streaming::Streamer

Attributes

service[R]
thread[R]

Public Class Methods

defevent(name, many=false, &proc) click to toggle source

イベントを登録する

Args

name

イベント名

many

オブジェクトをまとめて配列で受け取るかどうか

&proc

イベントを受け取るオブジェクト。

# File core/plugin/streaming/streamer.rb, line 13
def self.defevent(name, many=false, &proc)
  speed_key = "#{name}_queue_delay".to_sym
  define_method("_event_#{name}", &proc)
  if many
    define_method("event_#{name}"){ |json|
      @queue[name] ||= TimeLimitedQueue.new(HYDE, everytime{ (UserConfig[speed_key] || 100).to_f / 1000 }){ |data|
        begin
          __send__("_event_#{name}", data)
        rescue Exception => e
          warn e end }
      @threads[name] ||= everytime{ @queue[name].thread }
      @queue[name].push json }
  else
    define_method("event_#{name}"){ |json|
      @queue[name] ||= Queue.new
      @threads[name] ||= Thread.new{
        loop{
          begin
            sleep((UserConfig[speed_key] || 100).to_f / 1000)
            __send__("_event_#{name}", @queue[name].pop)
          rescue Exception => e
            warn e end } }
      queue_push(name, json) } end end
new(service, &on_connect) click to toggle source

Args

service

接続するService

on_connect

接続されたら呼ばれる

# File core/plugin/streaming/streamer.rb, line 40
def initialize(service, &on_connect)
  @service = service
  @thread = Thread.new(&method(:mainloop))
  @on_connect = on_connect
  @threads = {}
  @queue = {} end

Public Instance Methods

kill() click to toggle source

UserStreamを終了する

# File core/plugin/streaming/streamer.rb, line 59
def kill
  @thread.kill
  @threads.each{ |event, thread|
    thread.kill }
  @threads.clear
  @queue.clear end
mainloop() click to toggle source
# File core/plugin/streaming/streamer.rb, line 47
def mainloop
  service.streaming{ |q|
    if q and not q.empty?
      parsed = JSON.parse(q) rescue nil
      event_factory parsed if parsed end }
rescue Net::ReadTimeout
  raise
rescue => exception
  error exception
  raise end