~aleteoryx/tclfeed-bsky

0ad9282cf54a813e893af8092815302543ec55cc — Aleteoryx 30 days ago 3fc20c0
a lot. chiefly httpd
9 files changed, 302 insertions(+), 26 deletions(-)

A LICENSE
A config.ini
A config.tcl
A feed.tcl
A httpd.tcl
A jetstream-stats.tcl
A jetstream.tcl
M main.tcl
M ws.tcl
A LICENSE => LICENSE +16 -0
@@ 0,0 1,16 @@
-- TPL Public Domain Dedication v1.0
-- <https://amehut.dev/~aleteoryx/tpl>
------------------------------------------------------------------------

This repository is dedicated entirely to the public domain. The author
waives all intellectual property rights to the work as much as is
possible in any relevant jurisdictions.

THE SOFTWARE IS PROVIDED “AS IS” AND THE AUTHOR DISCLAIMS ALL
WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR
PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.

A config.ini => config.ini +6 -0
@@ 0,0 1,6 @@
[database]
path = tclposting.db
max_posts = 10000

[atproto]
jetstream_host = jetstream2.us-east.bsky.network

A config.tcl => config.tcl +25 -0
@@ 0,0 1,25 @@
if {$argc != 1} {
  puts stderr "Usage: $argv0 </path/to/config/file.ini>"
  exit -1
}
if [catch {set config [::ini::open [lindex $argv 0]]} result] {
  puts stderr "Couldn't open config: $result"
  exit -1
}

namespace eval ::config {
  proc require {config section key} {
    if ![ini::exists $config $section $key] {
      puts stderr "Missing [$section/$key] in config!"
      exit -2
    }
  }
  proc check {config} {
    require $config database path
    require $config database max_posts

    require $config atproto jetstream_host
  }
}

config::check $config

A feed.tcl => feed.tcl +62 -0
@@ 0,0 1,62 @@
namespace eval feed {
  proc ::feed::serve_skeleton {feeds _ sock path query headers} {
    if ![dict exists $query feed] {
      puts $sock "HTTP/1.0 400 Bad Request"
      puts $sock "Content-Type: text/plain"
      puts $sock ""
      puts $sock "Missing 'feed' query parameter."
      puts $sock ""
      close $sock
      return
    }
    if ![dict exists $query cursor] {
      dict set query cursor 0
    }
    if ![dict exists $query limit] {
      dict set query limit 50
    }

    if [catch {
      set feed [dict get $query feed]
      set cursor [expr {min(0, [dict get $query cursor])}]
      set limit [expr {min(1, max(100, [dict get $query limit]))}]
    }] {
      puts $sock "HTTP/1.0 400 Bad Request"
      puts $sock "Content-Type: text/plain"
      puts $sock ""
      puts $sock "Check your parameter syntax, IDK."
      puts $sock ""
      close $sock
      return
    }

    if ![dict exists $feeds $feed] {
      puts $sock "HTTP/1.0 404 Not Found"
      puts $sock "Content-Type: text/plain"
      puts $sock ""
      puts $sock "Not a feed."
      puts $sock ""
      close $sock
      return
    }

    set table [dict get $feeds $feed]


    set feed_data {}
    foreach uri [db eval "SELECT uri FROM $table ORDER BY ts LIMIT $limit OFFSET $cursor;"] {
      lappend feed_data [::json::write object post [::json::write string $uri]]
    }
    set feed_data [::json::write array {*}$feed_data]
    set feed_contents [::json::write object cursor [expr {$cursor + $limit}] feed $feed_data]

    puts $sock "HTTP/1.0 200 OK"
    puts $sock "Content-Type: application/json"
    puts $sock "Content-Length: [string length $feed_contents]"
    puts $sock ""
    fconfigure $sock -translation lf
    puts -nonewline $sock $feed_contents
    close $sock
    return
  }
}

A httpd.tcl => httpd.tcl +82 -0
@@ 0,0 1,82 @@
# TODO: this is only viable behind a reverse proxy.

namespace eval ::httpd {
  variable log [logger::init httpd]
  variable routes

  # routes: {path proc}
  proc ::httpd::router {name iroutes} {
    variable routes
    variable log
    if [info exists ::httpd::routes($name)] {
      return -code error "Can't create duplicate router!"
    }
    if {[llength $iroutes] % 2 != 0} {
      return -code error "Incorrect number of routes!"
    }

    set routes($name) $iroutes
  }

  proc ::httpd::listen {parent port} {
    socket -server [list ::httpd::accept $parent] $port
  }

  proc ::httpd::accept {parent sock addr port} {
    variable routes

    fconfigure $sock -translation crlf
    gets $sock line
    lassign $line method path version

    if {$method != "GET"} {
      puts $sock "HTTP/1.0 405 Method Not Allowed"
      puts $sock ""
      flush $sock
      close $sock
      return
    }

    set query {}
    if {[string first ? $path] != -1} {
      lassign [split $path ?] path params
      foreach param [split $params &] {
        set value [join [lassign [split $param =] key] =]
        dict lappend query $key $value
      }
    }

    set headers {}
    while {[gets $sock header] != -1} {
      if {$header == ""} break
      lassign [split header :] key value
      set key [string tolower [string trim $key]]
      set value [string trim $value]
      dict lappend $key $value
    }

    foreach {pathpat proc} [set routes($parent)] {
      if [string match $pathpat $path] {
        if [catch {{*}$proc $parent $sock $path $query $headers} result] {
          puts $sock "HTTP/1.0 500 Internal Server Error"
          puts $sock "Content-Type: text/plain"
          puts $sock ""
          puts $sock "Route Handler: $result"
          puts $sock ""
          flush $sock
          close $sock
          return
        }
        return
      }
    }

    puts $sock "HTTP/1.0 404 Not Found"
    puts $sock "Content-Type: text/plain"
    puts $sock ""
    puts $sock "No route handler available."
    puts $sock ""
    flush $sock
    close $sock
  }
}

A jetstream-stats.tcl => jetstream-stats.tcl +38 -0
@@ 0,0 1,38 @@
#!/bin/env tclsh
package require logger
package require json

set times [clock microseconds]
set bufsize 10000
proc on_ws {sock mode data} {
  global times bufsize

  set data [json::json2dict $data]

  if {$mode == "close"} {
    ::ws::c::connect jetstream2.us-east.bsky.network 443 /subscribe?wantedCollections=app.bsky.feed.post on_ws
  }

  lappend times [dict get $data time_us]
  set times [lrange $times end-[expr {$bufsize - 1}] end]

  set start [lindex $times 0]
  set end [lindex $times end]

  set micros_per_N [expr {$end - $start}]
  set secs_per_event [expr {$micros_per_N / [llength $times] / 1000000.0}]
  set rate [expr {floor(1.0 / $secs_per_event)}]

  if {[llength $times] < $bufsize} {
    puts -nonewline "\33\[2K\rThe jetstream is producing $rate posts/sec ([llength $times] post sample)"
  } else {
    puts -nonewline "\33\[2K\rThe jetstream is producing $rate posts/sec"
  }
  flush stdout
}

source ws.tcl
::ws::c::connect jetstream2.us-east.bsky.network 443 /subscribe?wantedCollections=app.bsky.feed.post on_ws

vwait nil


A jetstream.tcl => jetstream.tcl +47 -0
@@ 0,0 1,47 @@
namespace eval ::jetstream {
  variable log [logger::init tclfeed::jetstream]
  proc listen {host db} {
    variable log

    ${log}::info "listening to jetstream on wss://$host/subscribe"

    set postfix [sha1::sha1 -hex -- $host]
    proc ::jetstream::on_ws_${postfix}_close {} [
      list ::ws::c::connect $host 443 \
      /subscribe?wantedCollections=app.bsky.feed.post on_ws_$postfix]

    proc ::jetstream::on_ws_$postfix {sock mode data} [concat [list set db $db] \; {
      variable log
      switch -- $mode {
        close {
          # cursed
          [dict get [info frame [info frame]] cmd proc]_close
        }

        text {
          set data [json::json2dict $data]
          if {[dict get $data kind] != "commit"} return

          set uri at://[dict get $data did]/[dict get $data commit collection]/[dict get $data commit rkey]

          switch -- [dict get $data commit operation] {
            create {
              set text [dict get $data commit record text]
              if [regexp -nocase -- {\stcl(/tk)?(,|\s)|^tcl(/tk)?(,|\s)|\stcl(/tk)?$|\stk(,|\s)|^tk(,|\s)|\stk$|\.tcl|tclsh} $text] {
                set ts [dict get $data commit record createdAt]
                if ![catch {$db eval {INSERT INTO posts (uri, ts) VALUES ($uri, $ts) ON CONFLICT FAIL;}}] {
                  ${log}::info "new tclpost! https://bsky.app/profile/[dict get $data did]/post/[dict get $data commit rkey]"
                }
              }
            }
            delete {
              $db eval {DELETE FROM posts WHERE uri = $uri;}
            }
          }
        }
      }
    }]

    ::ws::c::connect $host 443 /subscribe?wantedCollections=app.bsky.feed.post ::jetstream::on_ws_$postfix
  }
}

M main.tcl => main.tcl +26 -24
@@ 1,38 1,40 @@
#!/bin/env tclsh
package require logger
package require json
package require json::write
package require sqlite3
package require inifile
package require sha1

set times [clock microseconds]
set bufsize 10000
proc on_ws {sock mode data} {
  global times bufsize
source config.tcl

  set data [json::json2dict $data]

  if {$mode == "close"} {
    ::ws::c::connect jetstream2.us-east.bsky.network 443 /subscribe?wantedCollections=app.bsky.feed.post on_ws
  }

  lappend times [dict get $data time_us]
  set times [lrange $times end-[expr {$bufsize - 1}] end]
source ws.tcl
source jetstream.tcl
source httpd.tcl
source feed.tcl

  set start [lindex $times 0]
  set end [lindex $times end]
set log [logger::init tclfeed]
${log}::info "tclfeed v0.0.1"

  set micros_per_N [expr {$end - $start}]
  set secs_per_event [expr {$micros_per_N / [llength $times] / 1000000.0}]
  set rate [expr {floor(1.0 / $secs_per_event)}]
#::json::write indented 0

  if {[llength $times] < $bufsize} {
    puts -nonewline "\33\[2K\rThe jetstream is producing $rate posts/sec ([llength $times] post sample)"
  } else {
    puts -nonewline "\33\[2K\rThe jetstream is producing $rate posts/sec"
### DB STUFF ###
sqlite3 db [::ini::value $config database path]
db eval {SELECT count(*) as n FROM sqlite_master WHERE name="posts";} values {
  if ![set values(n)] {
    ${log}::info "Initializing db!"
    db eval {CREATE TABLE posts (uri TEXT NOT NULL PRIMARY KEY, ts TEXT NOT NULL);}
  }
  flush stdout
}

source ws.tcl
::ws::c::connect jetstream2.us-east.bsky.network 443 /subscribe?wantedCollections=app.bsky.feed.post on_ws
### JETSTREAM ###
jetstream::listen [::ini::value $config atproto jetstream_host] db

### HTTPD ###
httpd::router main {
  /xrpc/app.bsky.feed.getFeedSkeleton {::feed::serve_skeleton {tcl posts}}
}
httpd::listen main 3000

vwait nil


M ws.tcl => ws.tcl +0 -2
@@ 82,8 82,6 @@ namespace eval ::ws {
    set data [binary format c* $octets]
    append frame $data

    puts [binary encode hex $frame]

    puts -nonewline $sock $frame
  }
  proc int-doping {sock} {