~aleteoryx/tclfeed-bsky

9f9f4f965c01b6178d4bbd48bc0b4926632a210c — Aleteoryx 30 days ago
That's enough code to listen to jetstream
3 files changed, 291 insertions(+), 0 deletions(-)

A README.md
A main.tcl
A ws.tcl
A  => README.md +3 -0
@@ 1,3 @@
# A `bsky.app` feed for Tcl/Tk posts

Written in Tcl, using Jetstream.

A  => main.tcl +18 -0
@@ 1,18 @@
#!/bin/env tclsh
package require logger

set msgs 0
set start [clock seconds]
proc on_ws {sock mode data} {
  global start
  global msgs
  incr msgs
  set dur [expr {[clock seconds] - $start}]
  puts -nonewline "Recieved $msgs messages in $dur seconds. [expr {$msgs / ($dur+1)}] msgs/sec\r"
  flush stdout
}

source ws.tcl
::ws::c::connect jetstream2.us-east.bsky.network 443 /subscribe?wantedCollections=app.bsky.feed.post on_ws
vwait nil


A  => ws.tcl +270 -0
@@ 1,270 @@
namespace eval ::ws {
  proc read_frame {sock} {
    fconfigure $sock -blocking 1
    binary scan [read $sock 2] cc byte1 byte2
    set byte1 [expr {$byte1 & 0xFF}]
    set byte2 [expr {$byte2 & 0xFF}]

    set p_fin [expr {$byte1 >> 7}]
    #set p_rsv1 [expr {$byte1 >> 6 & 1}]
    #set p_rsv2 [expr {$byte1 >> 5 & 1}]
    #set p_rsv3 [expr {$byte1 >> 4 & 1}]
    set p_opcode [expr {$byte1 & 0x0F}]

    set p_mask [expr {$byte2 >> 7}]
    set p_len [expr {$byte2 & 0x7F}]

    if {$p_len == 126} {
      binary scan [read $sock 2] S p_len
      set p_len [expr {$p_len & 0xFFFF}]
    } elseif {$p_len == 127} {
      binary scan [read $sock 8] W p_len
    }

    if {$p_mask} {
      binary scan [read $sock 4] c4 p_mask_key
      set p_mask_bytes
      foreach byte $p_mask_key {
        lappend p_mask_bytes [expr {$byte & 0xFF}]
      }
    }

    set data [read $sock $p_len]
    fconfigure $sock -blocking 0

    if {$p_mask} {
      set octets [binary scan $data c*]
      for {set i 0} {$i < [llength $octets]} {incr i} {
        set j [expr {$i % 4}]
        set octet [lget $octets $i]
        set mask [lget $p_mask_bytes $j]
        lset octets $i [expr {($octet ^ $mask) & 0xFF}]
      }
      set data [binary format c* $octets]
    }

    dict create fin $p_fin opcode $p_opcode len $p_len data $data
  }
  proc send_frame {sock fin opcode data mask} {
    set frame {}

    if {$opcode > 16 || $opcode < 0} {
      return -code error "Opcode $opcode invalid. Must be in [0,16]."
    }

    append frame [byte format cc \
      [expr {($fin ? 128 : 0) + $opcode}] \
      [expr {($mask ? 128 : 0) + ($len > 65535 ? 127 : ($len > 125 ? 126 : $len))}]]

    if {$len > 65535} {
      append frame [byte format W $len]
    } elseif {$len > 125} {
      append frame [byte format S $len]
    }

    binary scan $data c* octets
    set octets [lrange $octets 0 $len-1]
    if {$mask} {
      set mask_bytes [list [expr {floor(rand() * 256)}] [expr {floor(rand() * 256)}] [expr {floor(rand() * 256)}] [expr {floor(rand() * 256)}]]
      append frame [binary format c4 $mask_bytes]

      for {set i 0} {$i < [llength $octets]} {incr i} {
        set j [expr {$i % 4}]
        set octet [lget $octets $i]
        set mask [lget $mask_bytes $j]
        lset octets $i [expr {($octet ^ $mask) & 0xFF}]
      }
    }
    set data [binary format c* $octets]
    append frame $data

    puts -nonewline $sock $frame
  }
}

namespace eval ::ws::c {
  variable callback
  variable handshake

  variable log [logger::init ::websocket::client]

  proc connect {host port path cb {sec {}}} {
    variable callback
    variable handshake
    if {$sec == ""} {
      if {$port in {443 8443}} {
        set sec 1
      } else {
        set sec 0
      }
    } elseif ![string is boolean $sec] {
      return -code error "sec must be bool or empty!"
    }

    if {$sec} {
      if {[info procs ::tls::socket] == {}} {package require tls}
      set sock [::tls::socket $host $port]
    } else {
      set sock [socket $host $port]
    }

    if {[info procs ::sha1::sha1] == {}} {package require sha1}
    if {[info procs ::base64::encode] == {}} {package require base64}
    set ws_key [::base64::encode -maxlen 0 -wrapchar "" [
      string range [::sha1::sha1 [clock microseconds]] 0 15]]
    set ws_accept [::base64::encode -maxlen 0 -wrapchar "" [
      ::sha1::sha1 -- [
        string cat $ws_key 258EAFA5-E914-47DA-95CA-C5AB0DC85B11]]]

    fconfigure $sock -translation crlf -blocking 0
    puts $sock "GET $path HTTP/1.1"
    puts $sock "Host: $host"
    puts $sock "Upgrade: websocket"
    puts $sock "Connection: Upgrade"
    puts $sock "Sec-WebSocket-Key: $ws_key"
    puts $sock "Sec-WebSocket-Version: 13"
    puts $sock ""
    flush $sock

    fileevent $sock readable [list ::ws::c::int-handshake $sock]

    set callback($sock) $cb
    set handshake($sock) [dict create headers {} accept $ws_accept status_line {} status_read 0]
    return $sock
  }

  variable frag
  variable mode
  # no, this isn't technically conformant. but jetstream wouldn't be evil to me so
  proc int-handshake {sock} {
    variable handshake
    variable log
    variable frag
    variable mode
    upvar 0 handshake($sock) state

    if ![dict get $state status_read] {
      if {[gets $sock status] != -1} {
        dict set state status_line $status
        dict set state status_read 1
      }
    }

    if [dict get $state status_read] {
      while {[gets $sock header] != -1} {
        if {$header == {}} {
          if {[string first "HTTP/1.1 101 " [dict get $state status_line]] != 0} {
            ${log}::error "Didn't get \"101 Switching Protocols\" when handshaking $sock."
            ${log}::error "Status: [dict get $state status_line]"
            foreach {k v} [dict get $state headers] {
              ${log}::error "Header $k: $v"
            }

            ${log}::error "Returned content: [read $sock]"

            close $sock
            return
          }

          if {[dict get $state headers sec-websocket-accept] != [dict get $state accept]} {
            ${log}::error "Got incorrect Sec-Websocket-Accept while handshaking $sock."
            ${log}::error "Expected: [dict get $state accept]"
            ${log}::error "Got:      [dict get $state headers sec-websocket-accept]"

            close $sock
            return
          }

          # finally, we can connect!
          fconfigure $sock -translation binary
          set mode($sock) {}
          set frag($sock) {}
          fileevent $sock readable [list ::ws::c::int-dispatch $sock]
        } else {
          lassign [split $header :] key value
          set key [string tolower [string trim $key]]
          set value [string trim $value]
          dict set state headers $key $value
        }
      }
    }
  }
  proc int-dispatch {sock} {
    variable log
    variable frag
    variable mode
    variable callback

    set frame [::ws::read_frame $sock]

    switch -- [dict get $frame opcode] {
      0 {
        if {[set mode($sock)] ni {1 2}} {
          ${log}::error "Got a continuation frame with no starting frame on $sock! Closing!"
          close $sock
        }
        append frags($sock) [dict get $frame data]
        if {[dict get $frame fin]} {
          [set callback($sock)] $sock [set mode($sock)] [set frag($sock)]
          set frags($sock) {}
        }
      }
      1 {
        if {[dict get $frame fin]} {
          set mode($sock) {}
          set frag($sock) {}
          [set callback($sock)] $sock 1 [dict get $frame data]
        } else {
          set mode($sock) 1
          set frag($sock) [dict get $frame data]
        }
      }
      2 {
        if {[dict get $frame fin]} {
          set mode($sock) {}
          set frag($sock) {}
          [set callback($sock)] $sock 2 [dict get $frame data]
        } else {
          set mode($sock) 2
          set frag($sock) [dict get $frame data]
        }
      }

      8 {
        close $sock
      }
      9 {
        ::ws::send_frame 0 10 [dict get $frame data] 1
      }
      10 {}
      default {
        ${log}::error "Unknown opcode [dict get $frame opcode] on $sock! Closing!"
        close $sock
      }
    }
  }
}

set msgs 0
set start [clock seconds]
proc on_ws {sock mode data} {
  global start
  global msgs
  incr msgs
  set dur [expr {[clock seconds] - $start}]
  puts -nonewline "Recieved $msgs messages in $dur seconds. [expr {$msgs / ($dur+1)}] msgs/sec\r"
  flush stdout

#  switch -- $mode {
#    1 {
#      puts "Text frame from $sock: $data";
#    }
#    2 {
#      puts "Binary frame from $sock."
#    }
#  }
}

::ws::c::connect jetstream2.us-east.bsky.network 443 /subscribe?wantedCollections=app.bsky.feed.post on_ws
vwait nil