@@ 9,6 9,10 @@ proc on_ws {sock mode data} {
set data [json::json2dict $data]
+ if {$mode == "close"} {
+ ::ws::c::connect jetstream2.us-east.bsky.network 443 /subscribe?wantedCollections=app.bsky.feed.post on_ws
+ }
+
lappend times [dict get $data time_us]
set times [lrange $times end-[expr {$bufsize - 1}] end]
@@ 26,8 30,9 @@ proc on_ws {sock mode data} {
}
flush stdout
}
+
source ws.tcl
::ws::c::connect jetstream2.us-east.bsky.network 443 /subscribe?wantedCollections=app.bsky.feed.post on_ws
+
vwait nil
-# &
@@ 1,7 1,11 @@
namespace eval ::ws {
proc read_frame {sock} {
fconfigure $sock -blocking 1
- binary scan [read $sock 2] cc byte1 byte2
+
+ if ![binary scan [read $sock 2] cc byte1 byte2] {
+ close $sock
+ return {}
+ }
set byte1 [expr {$byte1 & 0xFF}]
set byte2 [expr {$byte2 & 0xFF}]
@@ 36,8 40,8 @@ namespace eval ::ws {
set octets [binary scan $data c*]
for {set i 0} {$i < [llength $octets]} {incr i} {
set j [expr {$i % 4}]
- set octet [lget $octets $i]
- set mask [lget $p_mask_bytes $j]
+ set octet [lindex $octets $i]
+ set mask [lindex $p_mask_bytes $j]
lset octets $i [expr {($octet ^ $mask) & 0xFF}]
}
set data [binary format c* $octets]
@@ 45,41 49,50 @@ namespace eval ::ws {
dict create fin $p_fin opcode $p_opcode len $p_len data $data
}
- proc send_frame {sock fin opcode data mask} {
+ proc send_frame {sock fin opcode data len mask} {
set frame {}
if {$opcode > 16 || $opcode < 0} {
return -code error "Opcode $opcode invalid. Must be in [0,16]."
}
- append frame [byte format cc \
+ append frame [binary format cc \
[expr {($fin ? 128 : 0) + $opcode}] \
[expr {($mask ? 128 : 0) + ($len > 65535 ? 127 : ($len > 125 ? 126 : $len))}]]
if {$len > 65535} {
- append frame [byte format W $len]
+ append frame [binary format W $len]
} elseif {$len > 125} {
- append frame [byte format S $len]
+ append frame [binary format S $len]
}
binary scan $data c* octets
set octets [lrange $octets 0 $len-1]
if {$mask} {
- set mask_bytes [list [expr {floor(rand() * 256)}] [expr {floor(rand() * 256)}] [expr {floor(rand() * 256)}] [expr {floor(rand() * 256)}]]
+ set mask_bytes [list [expr {int(floor(rand() * 256))}] [expr {int(floor(rand() * 256))}] [expr {int(floor(rand() * 256))}] [expr {int(floor(rand() * 256))}]]
append frame [binary format c4 $mask_bytes]
for {set i 0} {$i < [llength $octets]} {incr i} {
set j [expr {$i % 4}]
- set octet [lget $octets $i]
- set mask [lget $mask_bytes $j]
+ set octet [lindex $octets $i]
+ set mask [lindex $mask_bytes $j]
lset octets $i [expr {($octet ^ $mask) & 0xFF}]
}
}
set data [binary format c* $octets]
append frame $data
+ puts [binary encode hex $frame]
+
puts -nonewline $sock $frame
}
+ proc int-doping {sock} {
+ ping $sock
+ after 10000 [list ::ws::int-doping $sock]
+ }
+ proc ping {sock} {
+ send_frame $sock 0 9 [sha1::sha1 -hex [expr {rand()}]] 40 1
+ }
}
namespace eval ::ws::c {
@@ 163,6 176,7 @@ namespace eval ::ws::c {
${log}::error "Returned content: [read $sock]"
close $sock
+ [set callback($sock)] $sock close [dict get $frame data]
return
}
@@ 172,6 186,7 @@ namespace eval ::ws::c {
${log}::error "Got: [dict get $state headers sec-websocket-accept]"
close $sock
+ [set callback($sock)] $sock close [dict get $frame data]
return
}
@@ 180,6 195,7 @@ namespace eval ::ws::c {
set mode($sock) {}
set frag($sock) {}
fileevent $sock readable [list ::ws::c::int-dispatch $sock]
+ ::ws::int-doping $sock
} else {
lassign [split $header :] key value
set key [string tolower [string trim $key]]
@@ 197,6 213,11 @@ namespace eval ::ws::c {
set frame [::ws::read_frame $sock]
+ if {$frame == {}} {
+ ${log}::warn "$sock unexpectedly closed!"
+ [set callback($sock)] $sock close [dict get $frame data]
+ }
+
switch -- [dict get $frame opcode] {
0 {
if {[set mode($sock)] ni {1 2}} {
@@ 213,9 234,9 @@ namespace eval ::ws::c {
if {[dict get $frame fin]} {
set mode($sock) {}
set frag($sock) {}
- [set callback($sock)] $sock 1 [dict get $frame data]
+ [set callback($sock)] $sock text [dict get $frame data]
} else {
- set mode($sock) 1
+ set mode($sock) text
set frag($sock) [dict get $frame data]
}
}
@@ 223,23 244,27 @@ namespace eval ::ws::c {
if {[dict get $frame fin]} {
set mode($sock) {}
set frag($sock) {}
- [set callback($sock)] $sock 2 [dict get $frame data]
+ [set callback($sock)] $sock binary [dict get $frame data]
} else {
- set mode($sock) 2
+ set mode($sock) binary
set frag($sock) [dict get $frame data]
}
}
8 {
close $sock
+ [set callback($sock)] $sock close [dict get $frame data]
}
9 {
- ::ws::send_frame 0 10 [dict get $frame data] 1
+ ::ws::send_frame 0 10 [dict get $frame data] [dict get $frame len] 1
+ }
+ 10 {
+ # TODO: kill connection on missed ping? but jetstream doesn't send any it looks like
}
- 10 {}
default {
${log}::error "Unknown opcode [dict get $frame opcode] on $sock! Closing!"
close $sock
+ [set callback($sock)] $sock close [dict get $frame data]
}
}
}