~aleteoryx/nex.nelua

a9dc10f1439263882976e3db9db9349281a21b42 — Aleteoryx 17 days ago 2716ae0
socket listener done
1 files changed, 134 insertions(+), 26 deletions(-)

M socket.nelua
M socket.nelua => socket.nelua +134 -26
@@ 4,6 4,8 @@
  but for now it's a standalone function.
]]

-- ## pragmas.nogc = true

-- c imports
## cinclude '<sys/socket.h>'
local AF_INET: cint <cimport, nodecl>


@@ 15,12 17,15 @@ local SOCK_STREAM: cint <cimport, nodecl>
-- local SOCK_RAW: cint <cimport, nodecl>

local MSG_DONTWAIT: cint <cimport, nodecl>
local MSG_PEEK: cint <cimport, nodecl>

local function c_socket(domain: cint, type: cint, protocol: cint): cint <cimport 'socket', nodecl> end
local function c_accept(fd: cint, address: pointer, address_len: *cint): cint <cimport 'accept', nodecl> end
local function c_listen(fd: cint, backlog: cint): cint <cimport 'listen', nodecl> end

local function c_send(fd: cint, buf: *[0]byte, len: usize, flags: cint): isize <cimport 'send', nodecl> end
local function c_recv(fd: cint, buf: *[0]byte, len: usize, flags: cint): isize <cimport 'recv', nodecl> end


## cinclude '<arpa/inet.h>'
local function c_inet_pton(af: cint, src: cstring <const>, dst: pointer): cint <cimport 'inet_pton', nodecl> end


@@ 65,10 70,12 @@ local function c_close(fd: cint): cint <cimport 'close', nodecl> end
-- end c imports

require 'string'
require 'hashmap'
require 'stringbuilder'
require 'coroutine'
require 'math'
require 'C.stdio'
require 'allocators.pool'
require 'allocators.default'


local function fakeuse(...: varargs) end


@@ 86,20 93,106 @@ local function die_errno(cond: boolean, msg: string): void
end

local handler_req = @enum{
  init = 0,
  close = 0,
  read_line,
  write,
  close
  write
}

local handler_state = @record{
  fd: cint,
  co: coroutine,
  last_req: handler_req
  last_req: handler_req,
  buf: stringbuilder
}

function handler_state:step(epfd: cint, state: *handler_state): (boolean)
  return false
function handler_state:step(epfd: cint, key: uint32): (boolean)
  local should_call = false
  switch self.last_req do
   case handler_req.write then fallthrough
   case handler_req.close then
    should_call = true
   case handler_req.read_line then
    local maxread <comptime> = 4096
    local buf: [maxread]byte = {}
    local len = c_recv(self.fd, &buf, maxread, MSG_PEEK)
    if len == -1 then
      local errmsg = "Internal error, please try again later.\n"
      C.perror("Error reading for handler")
      c_send(self.fd, errmsg.data, errmsg.size, 0)
      c_epoll_ctl(epfd, EPOLL_CTL_DEL, self.fd, nilptr)
      c_close(self.fd)
      return false
    end
    local lf_idx = -1
    for i=0,<len do
      if buf[i] == '\n'_b then
        lf_idx = i
        break
      end
    end

    if lf_idx ~= -1 then
      -- the kernel will notify us again if there's more data
      len = c_recv(self.fd, &buf, lf_idx + 1, 0)
      self.buf:write((@span(byte))({data = &buf, size = lf_idx}))
      self.co:push(self.buf:promote())
      self.buf = stringbuilder()
      should_call = true
    else
      len = c_recv(self.fd, &buf, maxread, 0)
      self.buf:write((@span(byte))({data = &buf, size = lf_idx}))
    end
  end

  if should_call then
    local req: handler_req
    while true do
      local ok, err = self.co:resume()
      if not ok then
        local errmsg = "Internal error, please try again later.\n"
        print("Error in handler:", err)
        c_send(self.fd, errmsg.data, errmsg.size, 0)
        c_epoll_ctl(epfd, EPOLL_CTL_DEL, self.fd, nilptr)
        c_close(self.fd)
        return false
      end
      self.co:pop(&req)
      if req == handler_req.write then
        local data: string
        self.co:pop(&data)
        local len = c_send(self.fd, data.data, data.size, 0)
        if len == -1 then
          C.perror("Error writing for handler")
          c_epoll_ctl(epfd, EPOLL_CTL_DEL, self.fd, nilptr)
          c_close(self.fd)
          return false
        end
      else break end
    end

    if req == handler_req.close then
      c_epoll_ctl(epfd, EPOLL_CTL_DEL, self.fd, nilptr)
      c_close(self.fd)
      return false
    end
    if req == handler_req.read_line then
      local event: epoll_event = {
        events = EPOLLIN,
        data = { u32 = key }
      }
      c_epoll_ctl(epfd, EPOLL_CTL_MOD, self.fd, &event)
    else
      local event: epoll_event = {
        events = 0,
        data = { u32 = key }
      }
      c_epoll_ctl(epfd, EPOLL_CTL_MOD, self.fd, &event)
    end

    self.last_req = req
  end

  return true
end

local function listen_sock(sock: cint, handler: function(): void): void


@@ 107,12 200,11 @@ local function listen_sock(sock: cint, handler: function(): void): void
  local epfd = c_epoll_create(0)
  die_errno(epfd == -1, "couldn't create epoll instance")

  -- opt out of GC because we give references to the kernel
  local alloc: PoolAllocator(@handler_state, 1024) -- probably overkill on max size
  local handlers: hashmap(uint32, handler_state)

  local sock_event: epoll_event = {
    events = EPOLLIN,
    data = { ptr = nilptr }
    data = { u32 = 0 }
  }

  c_epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &sock_event)


@@ 122,35 214,40 @@ local function listen_sock(sock: cint, handler: function(): void): void
    local events: [maxevents]epoll_event = {}
    local event_count = c_epoll_wait(epfd, &events, maxevents, -1)
    die_errno(event_count == -1, "couldn't wait on epoll instance")
    print("got events from epoll_wait:", event_count)
    for i = 0, < event_count do
      if events[i].data.ptr == nilptr then
      if events[i].data.u32 == 0 then
        local fd = c_accept(sock, nilptr, nilptr)
        local state = alloc:new(@handler_state)
        if state == nilptr then -- drop connections if allocation fails
        if #handlers >= 1024 then -- drop connections if allocation fails
          local errmsg = "The server is overloaded, please try again later.\n"
          c_send(fd, errmsg.data, errmsg.size, 0)
          c_close(fd)
          continue
        end
        state.fd = fd
        state.co = coroutine.create(handler)
        state.last_req = handler_req.init
        local ok, events = state:step()

        local key: uint32 = math.random(0_u32, (@uint32)(2^32-1))

        local state: handler_state = {
          fd = fd,
          co = coroutine.create(handler),
          last_req = handler_req.close
        }

        local fd_event: epoll_event = { data = { u32 = key } }
        c_epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &fd_event)

        local ok = state:step(epfd, key)
        if ok then
          local fd_event: epoll_event = {
            events = events,
            data = { ptr = (@pointer)(state) }
          }
          c_epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &fd_event)
          handlers[key] = state
        else
          c_epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nilptr)
          local errmsg = "Internal error, please try again later.\n"
          c_send(fd, errmsg.data, errmsg.size, 0)
          c_close(fd)
          alloc:delete()
        end
      else
        print("TODO")
        if not handlers[events[i].data.u32]:step(epfd, events[i].data.u32) then
          handlers:remove(events[i].data.u32)
        end
      end
    end
  end


@@ 183,8 280,19 @@ local function listen_tcp(address: string, handler: function(): void): void
    ]] ]==]
    die_errno(c_err == -1, "couldn't bind TCP socket")

    defer c_close(fd) end

    listen_sock(fd, handler)
  end
end

listen_tcp("127.0.0.1:1900", function() end)
listen_tcp("127.0.0.1:1901", function()
  local line: string
  repeat
    coroutine.yield(handler_req.read_line)
    coroutine.running():pop(&line)
    coroutine.running():push(line.."\n")
    coroutine.yield(handler_req.write)
  until line == "bye"
  coroutine.yield(handler_req.close)
end)