From 9f9f4f965c01b6178d4bbd48bc0b4926632a210c Mon Sep 17 00:00:00 2001 From: Aleteoryx Date: Thu, 21 Nov 2024 17:08:51 -0500 Subject: [PATCH] That's enough code to listen to jetstream --- README.md | 3 + main.tcl | 18 ++++ ws.tcl | 270 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 291 insertions(+) create mode 100644 README.md create mode 100755 main.tcl create mode 100644 ws.tcl diff --git a/README.md b/README.md new file mode 100644 index 0000000..d0de944 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# A `bsky.app` feed for Tcl/Tk posts + +Written in Tcl, using Jetstream. diff --git a/main.tcl b/main.tcl new file mode 100755 index 0000000..e249b33 --- /dev/null +++ b/main.tcl @@ -0,0 +1,18 @@ +#!/bin/env tclsh +package require logger + +set msgs 0 +set start [clock seconds] +proc on_ws {sock mode data} { + global start + global msgs + incr msgs + set dur [expr {[clock seconds] - $start}] + puts -nonewline "Recieved $msgs messages in $dur seconds. [expr {$msgs / ($dur+1)}] msgs/sec\r" + flush stdout +} + +source ws.tcl +::ws::c::connect jetstream2.us-east.bsky.network 443 /subscribe?wantedCollections=app.bsky.feed.post on_ws +vwait nil + diff --git a/ws.tcl b/ws.tcl new file mode 100644 index 0000000..3610cf4 --- /dev/null +++ b/ws.tcl @@ -0,0 +1,270 @@ +namespace eval ::ws { + proc read_frame {sock} { + fconfigure $sock -blocking 1 + binary scan [read $sock 2] cc byte1 byte2 + set byte1 [expr {$byte1 & 0xFF}] + set byte2 [expr {$byte2 & 0xFF}] + + set p_fin [expr {$byte1 >> 7}] + #set p_rsv1 [expr {$byte1 >> 6 & 1}] + #set p_rsv2 [expr {$byte1 >> 5 & 1}] + #set p_rsv3 [expr {$byte1 >> 4 & 1}] + set p_opcode [expr {$byte1 & 0x0F}] + + set p_mask [expr {$byte2 >> 7}] + set p_len [expr {$byte2 & 0x7F}] + + if {$p_len == 126} { + binary scan [read $sock 2] S p_len + set p_len [expr {$p_len & 0xFFFF}] + } elseif {$p_len == 127} { + binary scan [read $sock 8] W p_len + } + + if {$p_mask} { + binary scan [read $sock 4] c4 p_mask_key + set p_mask_bytes + foreach byte $p_mask_key { + lappend p_mask_bytes [expr {$byte & 0xFF}] + } + } + + set data [read $sock $p_len] + fconfigure $sock -blocking 0 + + if {$p_mask} { + set octets [binary scan $data c*] + for {set i 0} {$i < [llength $octets]} {incr i} { + set j [expr {$i % 4}] + set octet [lget $octets $i] + set mask [lget $p_mask_bytes $j] + lset octets $i [expr {($octet ^ $mask) & 0xFF}] + } + set data [binary format c* $octets] + } + + dict create fin $p_fin opcode $p_opcode len $p_len data $data + } + proc send_frame {sock fin opcode data mask} { + set frame {} + + if {$opcode > 16 || $opcode < 0} { + return -code error "Opcode $opcode invalid. Must be in [0,16]." + } + + append frame [byte format cc \ + [expr {($fin ? 128 : 0) + $opcode}] \ + [expr {($mask ? 128 : 0) + ($len > 65535 ? 127 : ($len > 125 ? 126 : $len))}]] + + if {$len > 65535} { + append frame [byte format W $len] + } elseif {$len > 125} { + append frame [byte format S $len] + } + + binary scan $data c* octets + set octets [lrange $octets 0 $len-1] + if {$mask} { + set mask_bytes [list [expr {floor(rand() * 256)}] [expr {floor(rand() * 256)}] [expr {floor(rand() * 256)}] [expr {floor(rand() * 256)}]] + append frame [binary format c4 $mask_bytes] + + for {set i 0} {$i < [llength $octets]} {incr i} { + set j [expr {$i % 4}] + set octet [lget $octets $i] + set mask [lget $mask_bytes $j] + lset octets $i [expr {($octet ^ $mask) & 0xFF}] + } + } + set data [binary format c* $octets] + append frame $data + + puts -nonewline $sock $frame + } +} + +namespace eval ::ws::c { + variable callback + variable handshake + + variable log [logger::init ::websocket::client] + + proc connect {host port path cb {sec {}}} { + variable callback + variable handshake + if {$sec == ""} { + if {$port in {443 8443}} { + set sec 1 + } else { + set sec 0 + } + } elseif ![string is boolean $sec] { + return -code error "sec must be bool or empty!" + } + + if {$sec} { + if {[info procs ::tls::socket] == {}} {package require tls} + set sock [::tls::socket $host $port] + } else { + set sock [socket $host $port] + } + + if {[info procs ::sha1::sha1] == {}} {package require sha1} + if {[info procs ::base64::encode] == {}} {package require base64} + set ws_key [::base64::encode -maxlen 0 -wrapchar "" [ + string range [::sha1::sha1 [clock microseconds]] 0 15]] + set ws_accept [::base64::encode -maxlen 0 -wrapchar "" [ + ::sha1::sha1 -- [ + string cat $ws_key 258EAFA5-E914-47DA-95CA-C5AB0DC85B11]]] + + fconfigure $sock -translation crlf -blocking 0 + puts $sock "GET $path HTTP/1.1" + puts $sock "Host: $host" + puts $sock "Upgrade: websocket" + puts $sock "Connection: Upgrade" + puts $sock "Sec-WebSocket-Key: $ws_key" + puts $sock "Sec-WebSocket-Version: 13" + puts $sock "" + flush $sock + + fileevent $sock readable [list ::ws::c::int-handshake $sock] + + set callback($sock) $cb + set handshake($sock) [dict create headers {} accept $ws_accept status_line {} status_read 0] + return $sock + } + + variable frag + variable mode + # no, this isn't technically conformant. but jetstream wouldn't be evil to me so + proc int-handshake {sock} { + variable handshake + variable log + variable frag + variable mode + upvar 0 handshake($sock) state + + if ![dict get $state status_read] { + if {[gets $sock status] != -1} { + dict set state status_line $status + dict set state status_read 1 + } + } + + if [dict get $state status_read] { + while {[gets $sock header] != -1} { + if {$header == {}} { + if {[string first "HTTP/1.1 101 " [dict get $state status_line]] != 0} { + ${log}::error "Didn't get \"101 Switching Protocols\" when handshaking $sock." + ${log}::error "Status: [dict get $state status_line]" + foreach {k v} [dict get $state headers] { + ${log}::error "Header $k: $v" + } + + ${log}::error "Returned content: [read $sock]" + + close $sock + return + } + + if {[dict get $state headers sec-websocket-accept] != [dict get $state accept]} { + ${log}::error "Got incorrect Sec-Websocket-Accept while handshaking $sock." + ${log}::error "Expected: [dict get $state accept]" + ${log}::error "Got: [dict get $state headers sec-websocket-accept]" + + close $sock + return + } + + # finally, we can connect! + fconfigure $sock -translation binary + set mode($sock) {} + set frag($sock) {} + fileevent $sock readable [list ::ws::c::int-dispatch $sock] + } else { + lassign [split $header :] key value + set key [string tolower [string trim $key]] + set value [string trim $value] + dict set state headers $key $value + } + } + } + } + proc int-dispatch {sock} { + variable log + variable frag + variable mode + variable callback + + set frame [::ws::read_frame $sock] + + switch -- [dict get $frame opcode] { + 0 { + if {[set mode($sock)] ni {1 2}} { + ${log}::error "Got a continuation frame with no starting frame on $sock! Closing!" + close $sock + } + append frags($sock) [dict get $frame data] + if {[dict get $frame fin]} { + [set callback($sock)] $sock [set mode($sock)] [set frag($sock)] + set frags($sock) {} + } + } + 1 { + if {[dict get $frame fin]} { + set mode($sock) {} + set frag($sock) {} + [set callback($sock)] $sock 1 [dict get $frame data] + } else { + set mode($sock) 1 + set frag($sock) [dict get $frame data] + } + } + 2 { + if {[dict get $frame fin]} { + set mode($sock) {} + set frag($sock) {} + [set callback($sock)] $sock 2 [dict get $frame data] + } else { + set mode($sock) 2 + set frag($sock) [dict get $frame data] + } + } + + 8 { + close $sock + } + 9 { + ::ws::send_frame 0 10 [dict get $frame data] 1 + } + 10 {} + default { + ${log}::error "Unknown opcode [dict get $frame opcode] on $sock! Closing!" + close $sock + } + } + } +} + +set msgs 0 +set start [clock seconds] +proc on_ws {sock mode data} { + global start + global msgs + incr msgs + set dur [expr {[clock seconds] - $start}] + puts -nonewline "Recieved $msgs messages in $dur seconds. [expr {$msgs / ($dur+1)}] msgs/sec\r" + flush stdout + +# switch -- $mode { +# 1 { +# puts "Text frame from $sock: $data"; +# } +# 2 { +# puts "Binary frame from $sock." +# } +# } +} + +::ws::c::connect jetstream2.us-east.bsky.network 443 /subscribe?wantedCollections=app.bsky.feed.post on_ws +vwait nil + -- 2.45.2