comparison net.rhope @ 142:7bbdc034e347

Fix some bugs. Get basic network code working (epoll listener + accept connections). Start porting webserver.
author Mike Pavone <pavone@retrodev.com>
date Sun, 21 Nov 2010 16:33:17 -0500
parents
children ff00538cd818
comparison
equal deleted inserted replaced
141:f2cb85c53ced 142:7bbdc034e347
1
2 Foreign C:libc
3 {
4 setsockopt[sockfd(Int32,Naked),level(Int32,Naked),optname(Int32,Naked),optval(Int32,Raw Pointer),optlen(Int32,Naked):out(Int32,Naked)]
5 listen[sockfd(Int32,Naked),backlog(Int32,Naked):out(Int32,Naked)]
6 send[sockfd(Int32,Naked),data(Array,Raw Pointer),datalen(Int64,Naked):out(Int32,Naked)]
7 recv[sockfd(Int32,Naked),data(Array,Raw Pointer,Mutable),maxlen(Int64,Naked):out(Int32,Naked),data]
8 fcntl[fd(Int32,Naked),command(Int32,Naked),value(Int64,Naked):out(Int32,Naked)]
9 pipe[descriptors(Array,Raw Pointer,Mutable):out(Int32,Naked),descriptors]
10 epoll_create[size(Int32,Naked):status(Int32,Naked)]
11 epoll_ctl[epfd(Int32,Naked),operation(Int32,Naked),fd(Int32,Naked),event(epoll_event,Raw Pointer):status(Int32,Naked)]
12 epoll_wait[epfd(Int32,Naked),events(Array,Raw Pointer,Mutable),maxevents(Int32,Naked),timeout(Int32,Naked):out(Int32,Naked),events]
13 }
14
15
16 //Note: port numbers would more properly be UInt16, think about changing later
17 Foreign C:runtime
18 {
19 _internal_bindnewsocket[port(Int32,Naked),setreuse(Int32,Naked):socket(Int32,Naked)]
20 _internal_accept[sockfd(Int32,Naked),addrbuf(Array,Raw Pointer,Mutable),buflen(Int32,Naked):consock(Int32,Naked),addrbuf]
21 }
22
23 Blueprint epoll_event
24 {
25 events(UInt32,Naked)
26 data(Int64,Naked)
27 }
28
29 //EPOLLIN 1
30 //EPOLLOUT 4
31 //EPOLLPRI 2
32 //EPOLL_CTL_ADD 1
33 //EPOLL_CTL_DEL 2
34 //EPOLL_CTL_MOD 3
35
36 _Add New FDs[epfd,pipefd:out]
37 {
38 count,rdata <- read[pipefd, _internal_array_allocnaked[2,Int32()], 8i64]
39 ,out <- If[[count]=[8i64]]
40 {
41 data <- [rdata]Length <<[2]
42 epoll_ctl[epfd, 1i32, [data]Index[0], [[Build[epoll_event()]]events <<[Abs UInt[[data]Index[1]]]]data <<[Int64[[data]Index[0]]]]
43 { out <- _Add New FDs[epfd,pipefd] }
44 }{
45 //TODO: Properly deal with the case when count > 0 but < 8
46 If[[count]>[0i64]]
47 {
48 Print[["read of listener pipe returned unexpected number of bytes: "]Append[String[count]]]
49 }
50 }
51 }
52
53 _Get IO Context[fd:out,notfound] uses socklisten
54 {
55 out,notfound <- [socklisten::fdlookup]Index[fd]
56 }
57
58 _Handle Events[epfd,pipefd,buf,cur:out]
59 {
60 event,out <- [buf]Index[cur]
61 {
62 If[[event]data >>]
63 {
64 activefd <- Trunc Int32[[event]data >>]
65 epoll_ctl[epfd, 2, activefd, Build[epoll_event()]]
66 {
67 If[[[event]events >>]&[8216u32]]
68 { res <- No }
69 { res <- Yes }
70 ct,cont <- _Get IO Context[activefd]
71 {
72 }{ Print["Could not find context for IO event"] }
73 ,cont <- Resume[ct,res]
74 {
75 cont <- Yield[]
76 }{
77 Print["could not resume context for IO event"]
78 }
79 }
80 }{
81 cont <- _Add New FDs[epfd,pipefd]
82 }
83 Val[cont]
84 {
85 out <- _Handle Events[epfd,pipefd,buf,[cur]+[1]]
86 }
87 }
88 }
89
90 _Wait Active[epfd,pipefd,buf]
91 {
92 workaround <- Yield[]
93 Val[workaround]
94 {
95 count,newbuf <- epoll_wait[epfd,buf,[buf]Storage >>,-1]
96 If[[count]=[-1]]
97 {
98 Print["epoll_wait returned error"]
99 }{
100 If[[count]=[0]]
101 {
102 //Shouldn't happen normally, but perhaps if there was a signal
103 _Wait Active[epfd,pipefd,buf]
104 }{
105 _Handle Events[epfd,pipefd,[newbuf]Length <<[count],0]
106 { _Wait Active[epfd,pipefd,buf] }
107 }
108 }
109 }
110 }
111
112 _Sock Listener[pipefd]
113 {
114 epfd <- epoll_create[16]
115 If[[epfd]=[-1]]
116 {
117 Print["Error creating epoll file descriptor"]
118 }{
119 If[[epoll_ctl[epfd,1i32,pipefd,[[Build[epoll_event()]]events <<[1u32]]data <<[0i64]]]=[-1]]
120 {
121 Print["Error adding pipe to epoll fd"]
122 }{
123 _Wait Active[epfd, pipefd, _internal_array_allocnaked[8,epoll_event()]]
124 }
125 }
126 }
127
128 Globals socklisten
129 {
130 listener started <- No
131 fdlookup <- ()
132 pipefd <- -1
133 }
134
135 _Add FD to Listener[fd,context:pipefd,err] uses socklisten
136 {
137 If[socklisten::listener started]
138 {
139 pipefd <- socklisten::pipefd
140 do add <- Yes
141 }{
142 //Calling a function with side effects inside a transaction is BAD
143 //Need to do something about this
144 ret,des <- pipe[[Array[]]Set[1, 0i32]]
145 If[[ret]=[-1]]
146 {
147 err <- "Error creating pipe for waking up socket listener"
148 }{
149 socklisten::pipefd <- [des]Index[1]
150 pipefd <- [des]Index[1]
151 //fcntl[fd, F_SETFL, O_NONBLOCK]
152 //Set both ends of the pipe to non blocking
153 fcntl[[des]Index[0], 4i32, 2048i64]
154 fcntl[[des]Index[1], 4i32, 2048i64]
155 socklisten::listener started <- Yes
156 Call Async[_Sock Listener[[des]Index[0],?]]
157 do add <- Yes
158 }
159 }
160 Val[do add]
161 {
162 socklisten::fdlookup <- [socklisten::fdlookup]Set[fd, context]
163 }
164 }
165
166 _Write to Listener Pipe[pipefd,data]
167 {
168 res <- write[pipefd, data, 8i64]
169 If[[res]!=[8i32]]
170 {
171 workaround <- Yield[]
172 Val[workaround]
173 { _Write to Listener Pipe[pipefd,data] }
174 }
175 }
176
177 _Wait for IO[fd,type,context]
178 {
179 _Add FD to Listener[fd,context]
180 {
181 _Write to Listener Pipe[~,[[Array[]]Append[fd]]Append[type]]
182 }{
183 Print[~]
184 { Resume[context,No] }
185 }
186 }
187
188 Wait for IO[fd,type:out]
189 {
190 out <- Pause[_Wait for IO[fd,type,?]]
191 }
192
193 _Do Con Call[newfd,address,tocall]
194 {
195 [tocall]Call[newfd,address]
196 }
197
198 _Null Term[raw str,cur:out]
199 {
200 [raw str]Index[cur]
201 {
202 If[[~]=[0u8]]
203 {
204 out <- String[[raw str]Length <<[cur]]
205 }{
206 out <- _Null Term[raw str, [cur]+[1]]
207 }
208 }{
209 out <- String[raw str]
210 }
211 }
212
213 _Port Wait[fd,tocall:out]
214 {
215 con <- _internal_accept[fd,_internal_array_allocnaked[40,UInt8()],40] {}
216 { address <- _Null Term[[~]Length <<[40], 0i32] }
217 If[[con]=[-1]]
218 {
219 If[Wait for IO[fd, 1i32]]
220 {
221 out <- _Port Wait[fd,tocall]
222 }{
223 Print["Error waiting for connection"]
224 }
225 }{
226 Call Async[_Do Con Call[con,address,tocall,?]]
227 {
228 out <- _Port Wait[fd,tocall]
229 }
230 }
231 }
232
233 Listen on Port[port(Int32),tocall:out]
234 {
235 fd <- _internal_bindnewsocket[port,1]
236 //fcntl[fd, F_SETFL, O_NONBLOCK]
237 //Set listen socket to non blocking
238 If[[fd]=[-1]]
239 { out <- No }
240 {
241 fcntl[fd, 4i32, 2048i64]
242 { listen[fd,8]
243 { out <- Call Async[_Port Wait[fd,tocall,?]] }}
244 }
245 }
246
247 //This effectively leaks a context and thus any data on the stack of that context
248 //Need either handle cleanup of contexts or find a better way to accomplish this
249 Wait Forever[]
250 {
251 Pause[Val[?]]
252 }
253