Skip to content

Commit 0cccf33

Browse files
committed
implemented a basic tcp server and tests
1 parent e976478 commit 0cccf33

7 files changed

Lines changed: 106 additions & 45 deletions

File tree

pixie/io.pxi

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@
111111
IDisposable
112112
(-dispose! [this]
113113
(set-buffer-count! buffer idx)
114+
(write downstream buffer))
115+
IFlushableStream
116+
(flush [this]
117+
(set-buffer-count! buffer idx)
118+
(set-field! this :idx 0)
114119
(write downstream buffer)))
115120

116121
(deftype BufferedInputStream [upstream idx buffer]
@@ -124,16 +129,21 @@
124129
val))
125130
IDisposable
126131
(-dispose! [this]
127-
(dispose! upstream)
128132
(dispose! buffer)))
129133

130-
(defn buffered-output-stream [downstream size]
131-
(->BufferedOutputStream downstream 0 (buffer size)))
134+
(defn buffered-output-stream
135+
([downstream]
136+
(buffered-output-stream downstream DEFAULT-BUFFER-SIZE))
137+
([downstream size]
138+
(->BufferedOutputStream downstream 0 (buffer size))))
132139

133-
(defn buffered-input-stream [upstream size]
134-
(let [b (buffer size)]
135-
(set-buffer-count! b size)
136-
(->BufferedInputStream upstream size b)))
140+
(defn buffered-input-stream
141+
([upstream]
142+
(buffered-input-stream upstream DEFAULT-BUFFER-SIZE))
143+
([upstream size]
144+
(let [b (buffer size)]
145+
(set-buffer-count! b size)
146+
(->BufferedInputStream upstream size b))))
137147

138148
(defn throw-on-error [result]
139149
(when (neg? result)
@@ -180,25 +190,6 @@
180190
(defn run-command [command]
181191
(st/apply-blocking io-blocking/run-command command))
182192

183-
(comment
184-
185-
(defn tcp-server [ip port on-connection]
186-
(assert (string? ip) "Ip should be a string")
187-
(assert (integer? port) "Port should be a int")
188-
(let [server (uv/uv_tcp_t)
189-
bind-addr (uv/sockaddr_in)
190-
_ (uv/throw-on-error (uv/uv_ip4_addr ip port bind-addr))
191-
on-new-connetion (atom nil)]
192-
(reset! on-new-connetion
193-
(ffi/ffi-prep-callback
194-
uv/uv_connection_cb
195-
(fn [server status]
196-
(when (not (= status -1))
197-
(println "Got Client!!!!!!!")))))
198-
(uv/uv_tcp_init (uv/uv_default_loop) server)
199-
(uv/uv_tcp_bind server bind-addr 0)
200-
(uv/throw-on-error (uv/uv_listen server 128 @on-new-connetion))
201-
(st/yield-control))))
202193

203194
(comment
204195
(st/apply-blocking println "FROM OTHER THREAD <---!!!!!")

pixie/io/tcp.pxi

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,19 @@
44
[pixie.uv :as uv]
55
[pixie.ffi :as ffi]))
66

7-
(defrecord TCPServer [ip port on-connect uv-server bind-addr on-connection-cb])
7+
(defrecord TCPServer [ip port on-connect uv-server bind-addr on-connection-cb]
8+
IDisposable
9+
(-dispose! [this]
10+
(uv/uv_close uv-server st/close_cb)
11+
(dispose! @on-connection-cb)
12+
(dispose! bind-addr)))
813

914
(defn -prep-uv-buffer-fn [buf read-bytes]
1015
(ffi/ffi-prep-callback
1116
uv/uv_alloc_cb
1217
(fn [handle suggested-size uv-buf]
1318
(try
1419
(let [casted (ffi/cast uv-buf uv/uv_buf_t)]
15-
(println "Alloc " handle suggested-size buf read-bytes)
1620
(ffi/set! casted :base buf)
1721
(ffi/set! casted :len (min suggested-size
1822
(buffer-capacity buf)
@@ -30,37 +34,40 @@
3034
(reset! read-cb (ffi/ffi-prep-callback
3135
uv/uv_read_cb
3236
(fn [stream nread uv-buf]
33-
(println "-<<< nread <-- " nread)
3437
(set-buffer-count! buffer nread)
3538
(try
3639
(dispose! alloc-cb)
3740
(dispose! @read-cb)
3841
;(dispose! uv-buf)
3942
(uv/uv_read_stop stream)
40-
(st/run-and-process k nread)
43+
(st/run-and-process k (or
44+
(st/exception-on-uv-error nread)
45+
nread))
4146
(catch ex
4247
(println ex))))))
4348
(uv/uv_read_start uv-client alloc-cb @read-cb)))))
4449
IOutputStream
4550
(write [this buffer]
4651
(let [write-cb (atom nil)
4752
uv_write (uv/uv_write_t)]
48-
(println "writing " (count buffer))
4953
(ffi/set! uv-write-buf :base buffer)
5054
(ffi/set! uv-write-buf :len (count buffer))
5155
(st/call-cc
5256
(fn [k]
5357
(reset! write-cb (ffi/ffi-prep-callback
5458
uv/uv_write_cb
5559
(fn [req status]
56-
(println status "<<-- status")
5760
(try
5861
(dispose! @write-cb)
5962
;(uv/uv_close uv_write st/close_cb)
6063
(st/run-and-process k status)
6164
(catch ex
6265
(println ex))))))
63-
(uv/uv_write uv_write uv-client uv-write-buf 1 @write-cb))))))
66+
(uv/uv_write uv_write uv-client uv-write-buf 1 @write-cb)))))
67+
IDisposable
68+
(-dispose! [this]
69+
(dispose! uv-write-buf)
70+
(uv/uv_close uv-client st/close_cb)))
6471

6572
(defn launch-tcp-client-from-server [svr]
6673
(assert (instance? TCPServer svr) "Requires a TCPServer as the first argument")
@@ -94,3 +101,27 @@
94101
(uv/throw-on-error (uv/uv_listen server 128 @on-new-connetion))
95102
(st/yield-control)
96103
tcp-server))
104+
105+
106+
(defn tcp-client [ip port]
107+
(let [client-addr (uv/sockaddr_in)
108+
uv-connect (uv/uv_connect_t)
109+
client (uv/uv_tcp_t)
110+
cb (atom nil)]
111+
(uv/throw-on-error (uv/uv_ip4_addr ip port client-addr))
112+
(uv/uv_tcp_init (uv/uv_default_loop) client)
113+
(st/call-cc (fn [k]
114+
(reset! cb (ffi/ffi-prep-callback
115+
uv/uv_connect_cb
116+
(fn [_ status]
117+
(try
118+
(dispose! @cb)
119+
(dispose! uv-connect)
120+
(dispose! client-addr)
121+
(st/run-and-process k (or (st/exception-on-uv-error status)
122+
(->TCPStream client (uv/uv_buf_t))))
123+
(catch ex
124+
(println ex))))))
125+
(uv/uv_tcp_connect uv-connect client client-addr @cb)))
126+
127+
))

pixie/stacklets.pxi

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,28 @@
1616

1717
;; Yield
1818

19+
(defrecord ThrowException [ex])
20+
1921
(defn run-and-process
2022
([k]
2123
(run-and-process k nil))
2224
([k val]
2325
(let [[h f] (k val)]
2426
(f h))))
2527

28+
(defn exception-on-uv-error [result]
29+
(when (neg? result)
30+
(->ThrowException (str "UV Error: " (uv/uv_err_name result)))))
31+
32+
2633
(defn call-cc [f]
2734
(let [frames (-get-current-var-frames nil)
2835
[h val] (@stacklet-loop-h f)]
2936
(reset! stacklet-loop-h h)
3037
(-set-current-var-frames nil frames)
31-
val))
38+
(if (instance? ThrowException val)
39+
(throw (:ex val))
40+
val)))
3241

3342
(defn -run-later [f]
3443
(let [a (uv/uv_async_t)

pixie/stdlib.pxi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2324,4 +2324,4 @@ Calling this function on something that is not ISeqable returns a seq with that
23242324
"Returns the largest of all the arguments to this function. Assumes arguments are numeric"
23252325
([x] x)
23262326
([x y] (if (> x y) x y))
2327-
([x y & zs] (apply min (min x y) zs)))
2327+
([x y & zs] (apply max (max x y) zs)))

pixie/streams.pxi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
(ns pixie.streams)
22

3+
(defprotocol IFlushableStream
4+
(flush [this] "Flushes all buffers in this stream and applies writes to any parent streams"))
5+
36
(defprotocol IInputStream
47
(read [this buffer len] "Reads multiple bytes into a buffer, returns the number of bytes read"))
58

pixie/uv.pxi

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@
124124
(f/defconst UV_DIRENT_CHAR)
125125
(f/defconst UV_DIRENT_BLOCK)
126126

127+
(f/defconst UV_EOF)
128+
127129
(f/defcstruct uv_dirent_t [:name
128130
:type])
129131

tests/pixie/tests/io/test-tcp.pxi

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,40 @@
11
(ns pixie.test.io.test-tcp
22
(:require [pixie.io.tcp :refer :all]
3-
[pixie.io :refer [read write]]
3+
[pixie.io :refer [buffered-input-stream buffered-output-stream read-byte write-byte]]
4+
[pixie.streams :refer :all]
45
[pixie.stacklets :as st]
5-
[pixie.async :as async]))
6+
[pixie.async :as async]
7+
[pixie.uv :as uv]
8+
[pixie.test :refer :all]))
69

7-
(defn on-client [conn]
8-
(let [b (buffer 1024)]
9-
(read conn b 1024)
10-
(write conn b)
11-
(dotimes [x 1000]
12-
(st/yield-control))
13-
(println "Done Writing..")))
10+
(deftest test-echo-server
11+
(let [client-done (async/promise)
12+
on-client (fn on-client [conn]
13+
(let [in (buffered-input-stream conn)
14+
out (buffered-output-stream conn)]
15+
(try
16+
(loop []
17+
(let [val (read-byte in)]
18+
(write-byte out val)
19+
(flush out)
20+
(recur)))
21+
(catch ex
22+
(dispose! in)
23+
(dispose! out)
1424

15-
(tcp-server "0.0.0.0" 4242 on-client)
25+
(dispose! conn)
26+
(client-done true)))))
27+
28+
server (tcp-server "0.0.0.0" 4242 on-client)]
29+
30+
(let [client-stream (tcp-client "127.0.0.1" 4242)
31+
in (buffered-input-stream client-stream)
32+
out (buffered-output-stream client-stream)]
33+
34+
(dotimes [x 255]
35+
(write-byte out x)
36+
(flush out)
37+
(assert= x (read-byte in)))
38+
(dispose! client-stream)
39+
(assert @client-done)
40+
(dispose! server))))

0 commit comments

Comments
 (0)