Coroutines As Event Handlers

lua-users home
wiki

This is a very basic Supervisor (or event dispatcher), similar to ones found in Copas and ProgrammingInLua? (but not based on any of those). It was written partly in response to a question on the mailing list, and I've tried to comment it fully; there is an example of using it at the end. The question it attempts to respond to is:

So, how would one do something like...

result = block(self, event)

...where block() is a SWIG wrapped function that takes the script context (i.e. state) and an event as arguments, and blocks (yields) the script until the event occurs, after which the details are copied into a result structure (resumes)?

The response was, why not do it in Lua? The scaffolding below can handle hundreds of thousands of event dispatches per second, so it is not likely that writing it in C will help much; CPU time will be dominated by the cost of actually processing the requests.

local cocreate, resume, yield, costatus =

  coroutine.create, coroutine.resume, coroutine.yield, coroutine.status



local LOG = function(...)

  io.stderr:write(os.date"!%F %T ")

  io.stderr:write(...)

  io.stderr:write"\n"

end



-- This function expects to be a coroutine. When it is started,

-- it initializes itself, and then awaits further instructions.

-- Whenever it has nothing else to do it yields, unless it was

-- told to quit in which case it returns.



function supervisor(...)

  -- each event is a table of sets of coroutines;

  -- we assume we can handle the events in any order.

  local events = {}

  -- This table associates coroutines with names, mostly

  -- for debugging

  local name = {}

  -- to signal that we're done

  local done = false



  -- This function removes all references to a coro

  -- from all events, which we do when the coro dies.

  -- We could use weak tables to do the same thing.

  -- On the other hand, we might want to clean up other

  -- data associated with the coroutine. Anyway, this is easy:



  local function remove(coro)

    for _, coros in pairs(events) do

      coros[coro] = nil

    end

    name[coro] = nil

  end



  -- Convenience function to log tracebacks

  local function log_traceback(coro, msg, err)

    LOG("Coroutine ", name[coro], " ", msg, ":")

    local tb = debug.traceback(coro, err)

    for line in tb:gmatch"[^\n]+" do LOG("  ", line) end

  end



  -- The core routine, which handles the results of a

  -- coroutine.resume. First argument is the coro, rest

  -- are the return values from coro.resume(). The only

  -- thing we're expecting from the coro is a "waitfor"

  -- command, but other ones could be added...

  local function handle_resume(coro, ok, todo, ...)

    if costatus(coro) == "dead" then

      remove(coro) -- always get rid of it.

      -- log a traceback on error

      if not ok then log_traceback(coro, "failed with error", todo) end

    elseif todo ~= "waitfor" then

      -- todo should be "waitfor" and ... the event.

      -- If we had more verbs, we could use a case-table.

      log_traceback(coro, "unknown request "..tostring(todo),

                          "bad return value")

      remove(coro)

    else

      -- We don't care if the event doesn't exist (should we?)

      local q = events[...]

      if q == nil then q = {}; events[...] = q end

      -- We might want to tell the upper level that a new

      -- event needs to be recognized, though.

      -- if next(q) == nil then addevent((...)) end

      q[coro] = true

    end

  end 

      

  -- A table of actions; essentially a case statement

  local handler = {}



  -- do a clean exit (although actually there's no

  -- cleanup necessary, but there might be.)

  function handler.done()

    done = true

  end



  -- debugging: report on status

  function handler.status()

    -- invert the events table for nicer printing

    local n, e = {}, {}

    for evt, coros in pairs(events) do

      for coro in pairs(coros) do

        local who = name[coro]

        n[#n+1] = who

        e[who] = evt

      end

    end

    -- sort the names

    table.sort(n)

    -- and produce the report

    for _, who in ipairs(n) do

      LOG(who, " is waiting for ", tostring(e[who]))

    end

  end



  -- introduce a new actor (coroutine) into the system, and run

  -- it until it blocks

  function handler.introduce(who, func, ...)

    local coro = cocreate(func)

    name[coro] = who

    -- let it initialize itself

    return handle_resume(coro, resume(coro, ...))

  end



  -- send an event to whoever cares

  function handler.signal(what, ...)

    local q = events[what]

    if q and next(q) then

      for coro in pairs(q) do

        q[coro] = nil  -- handled

        handle_resume(coro, resume(coro, what, ...))

      end

      -- Maybe tell the top-level whether the event is

      -- still active?

      return next(q) ~= nil

    else

      -- No-one cares, sniff. "Log" the fact

      LOG("Event ", tostring(what), " dropped into the bit bucket")

    end

  end



  -- Set the __index meta for the handler table to avoid having

  -- to test for bad commands explicitly

  local function logargs(...)

    local t = {n = select("#", ...), ...}

    if t.n > 0 then

      for i = 1, t.n do

        local ti = t[i]

        t[i] = (type(ti) == "string" and "%q" or "%s")

               :format(tostring(ti))

      end

      LOG("..with arguments: ", table.concat(t, ", "))

    end

  end

  function handler:__index(what)

    LOG("Supervisor received unknown message ", what)

    return logargs

  end

  setmetatable(handler, handler)



  -- auxiliary function to handle a command, necessary to

  -- capture multiple returns from yield()

  local function handle(simonsays, ...)

    return handler[simonsays](...)

  end



  -- The main loop is a bit anti-climactic

  LOG"Starting up"

  local rv = handle(...)

  repeat 

    rv = handle(yield(rv))

  until done

  LOG"Shutting down"

end



------------------------------------------------------------------------------

--  Sample very simple processor

------------------------------------------------------------------------------



local function block(event) return yield("waitfor", event) end

local function process(n)

  for i = 1, n do

    local e, howmany = block"TICK"

    assert(e == "TICK" and howmany == i)

  end

end



------------------------------------------------------------------------------

-- Test driver in Lua

------------------------------------------------------------------------------



local super = cocreate(supervisor)



local function check(ok, rv)

  if not ok then

    LOG("supervisor crashed")

    local tb = debug.traceback(super, rv)

    for line in tb:gmatch"[^\n]+" do LOG("  ", line) end

    os.exit(1)

  end

  return rv

end



local function send(msg, ...) return check(resume(super, msg, ...)) end



local nprocess, nreps = tonumber(arg[1]) or 10, tonumber(arg[2] or 12)

for i = 1, tonumber(nprocess) do

  send("introduce", ("p%04i"):format(i), process, nreps)

end

local j = 1

while send("signal", "TICK", j) do

  j = j + 1

end

-- This should be empty

send"status"

send"done"

LOG("Endcount ", j)

Define Supervisor. [1] ? --DavidManura

This definition comes from a paper about Erlang, and is somewhat circular, but it sums it up pretty well: "a behaviour that implements a specific style of supervisor-worker programming which allows supervision trees to be constructed dynamically."

In other words, a Supervisor is responsible for the care and maintenance of a bunch of workers, organizing their workload, sending them events (as in this example) and handling errors.

There are also some nice examples in [Termite] (pdf document, readable if you like parentheses).

RecentChanges · preferences
edit · history
Last edited March 2, 2007 4:23 am GMT (diff)