From 0ad9282cf54a813e893af8092815302543ec55cc Mon Sep 17 00:00:00 2001 From: Aleteoryx Date: Fri, 22 Nov 2024 01:50:49 -0500 Subject: [PATCH] a lot. chiefly httpd --- LICENSE | 16 +++++++++ config.ini | 6 ++++ config.tcl | 25 ++++++++++++++ feed.tcl | 62 ++++++++++++++++++++++++++++++++++ httpd.tcl | 82 +++++++++++++++++++++++++++++++++++++++++++++ jetstream-stats.tcl | 38 +++++++++++++++++++++ jetstream.tcl | 47 ++++++++++++++++++++++++++ main.tcl | 50 ++++++++++++++------------- ws.tcl | 2 -- 9 files changed, 302 insertions(+), 26 deletions(-) create mode 100644 LICENSE create mode 100644 config.ini create mode 100644 config.tcl create mode 100644 feed.tcl create mode 100644 httpd.tcl create mode 100755 jetstream-stats.tcl create mode 100644 jetstream.tcl diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..a705bc8 --- /dev/null +++ b/LICENSE @@ -0,0 +1,16 @@ +-- TPL Public Domain Dedication v1.0 +-- +------------------------------------------------------------------------ + +This repository is dedicated entirely to the public domain. The author +waives all intellectual property rights to the work as much as is +possible in any relevant jurisdictions. + +THE SOFTWARE IS PROVIDED “AS IS” AND THE AUTHOR DISCLAIMS ALL +WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE +AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL +DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR +PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER +TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +PERFORMANCE OF THIS SOFTWARE. diff --git a/config.ini b/config.ini new file mode 100644 index 0000000..201fd05 --- /dev/null +++ b/config.ini @@ -0,0 +1,6 @@ +[database] +path = tclposting.db +max_posts = 10000 + +[atproto] +jetstream_host = jetstream2.us-east.bsky.network diff --git a/config.tcl b/config.tcl new file mode 100644 index 0000000..b20b968 --- /dev/null +++ b/config.tcl @@ -0,0 +1,25 @@ +if {$argc != 1} { + puts stderr "Usage: $argv0 " + exit -1 +} +if [catch {set config [::ini::open [lindex $argv 0]]} result] { + puts stderr "Couldn't open config: $result" + exit -1 +} + +namespace eval ::config { + proc require {config section key} { + if ![ini::exists $config $section $key] { + puts stderr "Missing [$section/$key] in config!" + exit -2 + } + } + proc check {config} { + require $config database path + require $config database max_posts + + require $config atproto jetstream_host + } +} + +config::check $config diff --git a/feed.tcl b/feed.tcl new file mode 100644 index 0000000..b4686b6 --- /dev/null +++ b/feed.tcl @@ -0,0 +1,62 @@ +namespace eval feed { + proc ::feed::serve_skeleton {feeds _ sock path query headers} { + if ![dict exists $query feed] { + puts $sock "HTTP/1.0 400 Bad Request" + puts $sock "Content-Type: text/plain" + puts $sock "" + puts $sock "Missing 'feed' query parameter." + puts $sock "" + close $sock + return + } + if ![dict exists $query cursor] { + dict set query cursor 0 + } + if ![dict exists $query limit] { + dict set query limit 50 + } + + if [catch { + set feed [dict get $query feed] + set cursor [expr {min(0, [dict get $query cursor])}] + set limit [expr {min(1, max(100, [dict get $query limit]))}] + }] { + puts $sock "HTTP/1.0 400 Bad Request" + puts $sock "Content-Type: text/plain" + puts $sock "" + puts $sock "Check your parameter syntax, IDK." + puts $sock "" + close $sock + return + } + + if ![dict exists $feeds $feed] { + puts $sock "HTTP/1.0 404 Not Found" + puts $sock "Content-Type: text/plain" + puts $sock "" + puts $sock "Not a feed." + puts $sock "" + close $sock + return + } + + set table [dict get $feeds $feed] + + + set feed_data {} + foreach uri [db eval "SELECT uri FROM $table ORDER BY ts LIMIT $limit OFFSET $cursor;"] { + lappend feed_data [::json::write object post [::json::write string $uri]] + } + set feed_data [::json::write array {*}$feed_data] + set feed_contents [::json::write object cursor [expr {$cursor + $limit}] feed $feed_data] + + puts $sock "HTTP/1.0 200 OK" + puts $sock "Content-Type: application/json" + puts $sock "Content-Length: [string length $feed_contents]" + puts $sock "" + fconfigure $sock -translation lf + puts -nonewline $sock $feed_contents + close $sock + return + } +} diff --git a/httpd.tcl b/httpd.tcl new file mode 100644 index 0000000..d57e421 --- /dev/null +++ b/httpd.tcl @@ -0,0 +1,82 @@ +# TODO: this is only viable behind a reverse proxy. + +namespace eval ::httpd { + variable log [logger::init httpd] + variable routes + + # routes: {path proc} + proc ::httpd::router {name iroutes} { + variable routes + variable log + if [info exists ::httpd::routes($name)] { + return -code error "Can't create duplicate router!" + } + if {[llength $iroutes] % 2 != 0} { + return -code error "Incorrect number of routes!" + } + + set routes($name) $iroutes + } + + proc ::httpd::listen {parent port} { + socket -server [list ::httpd::accept $parent] $port + } + + proc ::httpd::accept {parent sock addr port} { + variable routes + + fconfigure $sock -translation crlf + gets $sock line + lassign $line method path version + + if {$method != "GET"} { + puts $sock "HTTP/1.0 405 Method Not Allowed" + puts $sock "" + flush $sock + close $sock + return + } + + set query {} + if {[string first ? $path] != -1} { + lassign [split $path ?] path params + foreach param [split $params &] { + set value [join [lassign [split $param =] key] =] + dict lappend query $key $value + } + } + + set headers {} + while {[gets $sock header] != -1} { + if {$header == ""} break + lassign [split header :] key value + set key [string tolower [string trim $key]] + set value [string trim $value] + dict lappend $key $value + } + + foreach {pathpat proc} [set routes($parent)] { + if [string match $pathpat $path] { + if [catch {{*}$proc $parent $sock $path $query $headers} result] { + puts $sock "HTTP/1.0 500 Internal Server Error" + puts $sock "Content-Type: text/plain" + puts $sock "" + puts $sock "Route Handler: $result" + puts $sock "" + flush $sock + close $sock + return + } + return + } + } + + puts $sock "HTTP/1.0 404 Not Found" + puts $sock "Content-Type: text/plain" + puts $sock "" + puts $sock "No route handler available." + puts $sock "" + flush $sock + close $sock + } +} diff --git a/jetstream-stats.tcl b/jetstream-stats.tcl new file mode 100755 index 0000000..947616f --- /dev/null +++ b/jetstream-stats.tcl @@ -0,0 +1,38 @@ +#!/bin/env tclsh +package require logger +package require json + +set times [clock microseconds] +set bufsize 10000 +proc on_ws {sock mode data} { + global times bufsize + + set data [json::json2dict $data] + + if {$mode == "close"} { + ::ws::c::connect jetstream2.us-east.bsky.network 443 /subscribe?wantedCollections=app.bsky.feed.post on_ws + } + + lappend times [dict get $data time_us] + set times [lrange $times end-[expr {$bufsize - 1}] end] + + set start [lindex $times 0] + set end [lindex $times end] + + set micros_per_N [expr {$end - $start}] + set secs_per_event [expr {$micros_per_N / [llength $times] / 1000000.0}] + set rate [expr {floor(1.0 / $secs_per_event)}] + + if {[llength $times] < $bufsize} { + puts -nonewline "\33\[2K\rThe jetstream is producing $rate posts/sec ([llength $times] post sample)" + } else { + puts -nonewline "\33\[2K\rThe jetstream is producing $rate posts/sec" + } + flush stdout +} + +source ws.tcl +::ws::c::connect jetstream2.us-east.bsky.network 443 /subscribe?wantedCollections=app.bsky.feed.post on_ws + +vwait nil + diff --git a/jetstream.tcl b/jetstream.tcl new file mode 100644 index 0000000..84aa8bc --- /dev/null +++ b/jetstream.tcl @@ -0,0 +1,47 @@ +namespace eval ::jetstream { + variable log [logger::init tclfeed::jetstream] + proc listen {host db} { + variable log + + ${log}::info "listening to jetstream on wss://$host/subscribe" + + set postfix [sha1::sha1 -hex -- $host] + proc ::jetstream::on_ws_${postfix}_close {} [ + list ::ws::c::connect $host 443 \ + /subscribe?wantedCollections=app.bsky.feed.post on_ws_$postfix] + + proc ::jetstream::on_ws_$postfix {sock mode data} [concat [list set db $db] \; { + variable log + switch -- $mode { + close { + # cursed + [dict get [info frame [info frame]] cmd proc]_close + } + + text { + set data [json::json2dict $data] + if {[dict get $data kind] != "commit"} return + + set uri at://[dict get $data did]/[dict get $data commit collection]/[dict get $data commit rkey] + + switch -- [dict get $data commit operation] { + create { + set text [dict get $data commit record text] + if [regexp -nocase -- {\stcl(/tk)?(,|\s)|^tcl(/tk)?(,|\s)|\stcl(/tk)?$|\stk(,|\s)|^tk(,|\s)|\stk$|\.tcl|tclsh} $text] { + set ts [dict get $data commit record createdAt] + if ![catch {$db eval {INSERT INTO posts (uri, ts) VALUES ($uri, $ts) ON CONFLICT FAIL;}}] { + ${log}::info "new tclpost! https://bsky.app/profile/[dict get $data did]/post/[dict get $data commit rkey]" + } + } + } + delete { + $db eval {DELETE FROM posts WHERE uri = $uri;} + } + } + } + } + }] + + ::ws::c::connect $host 443 /subscribe?wantedCollections=app.bsky.feed.post ::jetstream::on_ws_$postfix + } +} diff --git a/main.tcl b/main.tcl index 947616f..d7ce83c 100755 --- a/main.tcl +++ b/main.tcl @@ -1,38 +1,40 @@ #!/bin/env tclsh package require logger package require json +package require json::write +package require sqlite3 +package require inifile +package require sha1 -set times [clock microseconds] -set bufsize 10000 -proc on_ws {sock mode data} { - global times bufsize +source config.tcl - set data [json::json2dict $data] - - if {$mode == "close"} { - ::ws::c::connect jetstream2.us-east.bsky.network 443 /subscribe?wantedCollections=app.bsky.feed.post on_ws - } - - lappend times [dict get $data time_us] - set times [lrange $times end-[expr {$bufsize - 1}] end] +source ws.tcl +source jetstream.tcl +source httpd.tcl +source feed.tcl - set start [lindex $times 0] - set end [lindex $times end] +set log [logger::init tclfeed] +${log}::info "tclfeed v0.0.1" - set micros_per_N [expr {$end - $start}] - set secs_per_event [expr {$micros_per_N / [llength $times] / 1000000.0}] - set rate [expr {floor(1.0 / $secs_per_event)}] +#::json::write indented 0 - if {[llength $times] < $bufsize} { - puts -nonewline "\33\[2K\rThe jetstream is producing $rate posts/sec ([llength $times] post sample)" - } else { - puts -nonewline "\33\[2K\rThe jetstream is producing $rate posts/sec" +### DB STUFF ### +sqlite3 db [::ini::value $config database path] +db eval {SELECT count(*) as n FROM sqlite_master WHERE name="posts";} values { + if ![set values(n)] { + ${log}::info "Initializing db!" + db eval {CREATE TABLE posts (uri TEXT NOT NULL PRIMARY KEY, ts TEXT NOT NULL);} } - flush stdout } -source ws.tcl -::ws::c::connect jetstream2.us-east.bsky.network 443 /subscribe?wantedCollections=app.bsky.feed.post on_ws +### JETSTREAM ### +jetstream::listen [::ini::value $config atproto jetstream_host] db + +### HTTPD ### +httpd::router main { + /xrpc/app.bsky.feed.getFeedSkeleton {::feed::serve_skeleton {tcl posts}} +} +httpd::listen main 3000 vwait nil diff --git a/ws.tcl b/ws.tcl index 98260b5..7b2a018 100644 --- a/ws.tcl +++ b/ws.tcl @@ -82,8 +82,6 @@ namespace eval ::ws { set data [binary format c* $octets] append frame $data - puts [binary encode hex $frame] - puts -nonewline $sock $frame } proc int-doping {sock} { -- 2.45.2