Mercurial > repos > blastem
comparison event_log.c @ 1958:9c01945b5d20
Use zlib to compress event log streams
author | Mike Pavone <pavone@retrodev.com> |
---|---|
date | Sat, 02 May 2020 17:33:23 -0700 |
parents | ba06346611a1 |
children | bd70f1e15684 |
comparison
equal
deleted
inserted
replaced
1957:ba06346611a1 | 1958:9c01945b5d20 |
---|---|
13 #include <errno.h> | 13 #include <errno.h> |
14 #include "event_log.h" | 14 #include "event_log.h" |
15 #include "util.h" | 15 #include "util.h" |
16 #include "blastem.h" | 16 #include "blastem.h" |
17 #include "saves.h" | 17 #include "saves.h" |
18 #include "zlib/zlib.h" | |
18 | 19 |
19 enum { | 20 enum { |
20 CMD_GAMEPAD_DOWN, | 21 CMD_GAMEPAD_DOWN, |
21 CMD_GAMEPAD_UP, | 22 CMD_GAMEPAD_UP, |
22 }; | 23 }; |
23 | 24 |
24 static uint8_t active, fully_active; | 25 static uint8_t active, fully_active; |
25 static FILE *event_file; | 26 static FILE *event_file; |
26 static serialize_buffer buffer; | 27 static serialize_buffer buffer; |
28 static uint8_t *compressed; | |
29 static size_t compressed_storage; | |
30 static z_stream output_stream; | |
31 static uint32_t last; | |
32 | |
33 static void event_log_common_init(void) | |
34 { | |
35 init_serialize(&buffer); | |
36 compressed_storage = 128*1024; | |
37 compressed = malloc(compressed_storage); | |
38 deflateInit(&output_stream, 9); | |
39 output_stream.avail_out = compressed_storage; | |
40 output_stream.next_out = compressed; | |
41 output_stream.avail_in = 0; | |
42 output_stream.next_in = buffer.data; | |
43 last = 0; | |
44 active = 1; | |
45 } | |
46 | |
47 static uint8_t multi_count; | |
48 static size_t multi_start; | |
49 static void finish_multi(void) | |
50 { | |
51 buffer.data[multi_start] |= multi_count - 2; | |
52 multi_count = 0; | |
53 } | |
54 | |
55 static void file_finish(void) | |
56 { | |
57 fwrite(compressed, 1, output_stream.next_out - compressed, event_file); | |
58 output_stream.next_out = compressed; | |
59 output_stream.avail_out = compressed_storage; | |
60 int result = deflate(&output_stream, Z_FINISH); | |
61 if (Z_STREAM_END != result) { | |
62 fatal_error("Final deflate call returned %d\n", result); | |
63 } | |
64 fwrite(compressed, 1, output_stream.next_out - compressed, event_file); | |
65 fclose(event_file); | |
66 } | |
27 | 67 |
28 static const char el_ident[] = "BLSTEL\x02\x00"; | 68 static const char el_ident[] = "BLSTEL\x02\x00"; |
29 static uint32_t last; | |
30 void event_log_file(char *fname) | 69 void event_log_file(char *fname) |
31 { | 70 { |
32 event_file = fopen(fname, "wb"); | 71 event_file = fopen(fname, "wb"); |
33 if (!event_file) { | 72 if (!event_file) { |
34 warning("Failed to open event file %s for writing\n", fname); | 73 warning("Failed to open event file %s for writing\n", fname); |
35 return; | 74 return; |
36 } | 75 } |
37 fwrite(el_ident, 1, sizeof(el_ident) - 1, event_file); | 76 fwrite(el_ident, 1, sizeof(el_ident) - 1, event_file); |
38 init_serialize(&buffer); | 77 event_log_common_init(); |
39 active = fully_active = 1; | 78 fully_active = 1; |
40 last = 0; | 79 atexit(file_finish); |
41 } | 80 } |
42 | 81 |
43 static int listen_sock, remotes[7]; | 82 static int listen_sock, remotes[7]; |
44 static int num_remotes; | 83 static int num_remotes; |
45 void event_log_tcp(char *address, char *port) | 84 void event_log_tcp(char *address, char *port) |
68 warning("Failed to listen for event log remotes on %s:%s\n", address, port); | 107 warning("Failed to listen for event log remotes on %s:%s\n", address, port); |
69 socket_close(listen_sock); | 108 socket_close(listen_sock); |
70 goto cleanup_address; | 109 goto cleanup_address; |
71 } | 110 } |
72 socket_blocking(listen_sock, 0); | 111 socket_blocking(listen_sock, 0); |
73 active = 1; | 112 event_log_common_init(); |
74 cleanup_address: | 113 cleanup_address: |
75 freeaddrinfo(result); | 114 freeaddrinfo(result); |
76 } | 115 } |
77 | 116 |
78 static uint8_t *system_start; | 117 static uint8_t *system_start; |
88 if (name_len > 255) { | 127 if (name_len > 255) { |
89 name_len = 255; | 128 name_len = 255; |
90 } | 129 } |
91 save_int8(&buffer, name_len); | 130 save_int8(&buffer, name_len); |
92 save_buffer8(&buffer, name, strlen(name)); | 131 save_buffer8(&buffer, name, strlen(name)); |
93 if (!fully_active) { | 132 if (listen_sock) { |
94 system_start = malloc(buffer.size); | 133 system_start = malloc(buffer.size); |
95 system_start_size = buffer.size; | 134 system_start_size = buffer.size; |
96 memcpy(system_start, buffer.data, buffer.size); | 135 memcpy(system_start, buffer.data, buffer.size); |
97 buffer.size = 0; | 136 } else { |
98 } | 137 //system start header is never compressed, so write to file immediately |
138 fwrite(buffer.data, 1, buffer.size, event_file); | |
139 } | |
140 buffer.size = 0; | |
99 } | 141 } |
100 | 142 |
101 //header formats | 143 //header formats |
102 //Single byte: 4 bit type, 4 bit delta (16-31) | 144 //Single byte: 4 bit type, 4 bit delta (16-31) |
103 //Three Byte: 8 bit type, 16-bit delta | 145 //Three Byte: 8 bit type, 16-bit delta |
104 //Four byte: 8-bit type, 24-bit signed delta | 146 //Four byte: 8-bit type, 24-bit signed delta |
105 #define FORMAT_3BYTE 0xE0 | 147 #define FORMAT_3BYTE 0xE0 |
106 #define FORMAT_4BYTE 0xF0 | 148 #define FORMAT_4BYTE 0xF0 |
107 static uint8_t last_event_type = 0xFF; | 149 static uint8_t last_event_type = 0xFF; |
108 static uint32_t last_delta; | 150 static uint32_t last_delta; |
109 static uint8_t multi_count; | |
110 static size_t multi_start; | |
111 static void event_header(uint8_t type, uint32_t cycle) | 151 static void event_header(uint8_t type, uint32_t cycle) |
112 { | 152 { |
113 uint32_t delta = cycle - last; | 153 uint32_t delta = cycle - last; |
114 if (multi_count) { | 154 if (multi_count) { |
115 if (type != last_event_type || delta != last_delta) { | 155 if (type != last_event_type || delta != last_delta) { |
116 buffer.data[multi_start] |= multi_count - 2; | 156 finish_multi(); |
117 multi_count = 0; | |
118 } else { | 157 } else { |
119 ++multi_count; | 158 ++multi_count; |
120 if (multi_count == 17) { | 159 if (multi_count == 17) { |
121 buffer.data[multi_start] |= multi_count - 2; | 160 finish_multi(); |
122 last_event_type = 0xFF; | 161 last_event_type = 0xFF; |
123 multi_count = 0; | |
124 } | 162 } |
125 return; | 163 return; |
126 } | 164 } |
127 } else if (type == last_event_type && delta == last_delta && type != EVENT_FLUSH) { | 165 } else if (type == last_event_type && delta == last_delta && type != EVENT_FLUSH) { |
128 //make some room | 166 //make some room |
157 event_header(EVENT_ADJUST, cycle); | 195 event_header(EVENT_ADJUST, cycle); |
158 last = cycle - deduction; | 196 last = cycle - deduction; |
159 save_int32(&buffer, deduction); | 197 save_int32(&buffer, deduction); |
160 } | 198 } |
161 | 199 |
162 static size_t remote_send_progress[7]; | 200 static uint8_t *remote_send_progress[7]; |
163 static uint8_t remote_needs_state[7]; | 201 static uint8_t remote_needs_state[7]; |
164 static void flush_socket(void) | 202 static void flush_socket(void) |
165 { | 203 { |
166 int remote = accept(listen_sock, NULL, NULL); | 204 int remote = accept(listen_sock, NULL, NULL); |
167 if (remote != -1) { | 205 if (remote != -1) { |
172 remotes[num_remotes] = remote; | 210 remotes[num_remotes] = remote; |
173 remote_needs_state[num_remotes++] = 1; | 211 remote_needs_state[num_remotes++] = 1; |
174 current_system->save_state = EVENTLOG_SLOT + 1; | 212 current_system->save_state = EVENTLOG_SLOT + 1; |
175 } | 213 } |
176 } | 214 } |
177 size_t min_progress = 0; | 215 uint8_t *min_progress = compressed; |
178 for (int i = 0; i < num_remotes; i++) { | 216 for (int i = 0; i < num_remotes; i++) { |
179 int sent = 1; | 217 int sent = 1; |
180 if (remote_needs_state[i]) { | 218 if (remote_needs_state[i]) { |
181 remote_send_progress[i] = buffer.size; | 219 remote_send_progress[i] = output_stream.next_out; |
182 } else { | 220 } else { |
183 uint8_t buffer[1500]; | 221 uint8_t buffer[1500]; |
184 int bytes = recv(remotes[i], buffer, sizeof(buffer), 0); | 222 int bytes = recv(remotes[i], buffer, sizeof(buffer), 0); |
185 for (int j = 0; j < bytes; j++) | 223 for (int j = 0; j < bytes; j++) |
186 { | 224 { |
208 warning("Unrecognized remote command %X\n", cmd); | 246 warning("Unrecognized remote command %X\n", cmd); |
209 j = bytes; | 247 j = bytes; |
210 } | 248 } |
211 } | 249 } |
212 } | 250 } |
213 while (sent && buffer.size - remote_send_progress[i]) | 251 while (sent && output_stream.next_out > remote_send_progress[i]) |
214 { | 252 { |
215 sent = send(remotes[i], buffer.data + remote_send_progress[i], buffer.size - remote_send_progress[i], 0); | 253 sent = send(remotes[i], remote_send_progress[i], output_stream.next_out - remote_send_progress[i], 0); |
216 if (sent >= 0) { | 254 if (sent >= 0) { |
217 remote_send_progress[i] += sent; | 255 remote_send_progress[i] += sent; |
218 } else if (socket_error_is_wouldblock()) { | 256 } else if (socket_error_is_wouldblock()) { |
219 socket_close(remotes[i]); | 257 socket_close(remotes[i]); |
220 remotes[i] = remotes[num_remotes-1]; | 258 remotes[i] = remotes[num_remotes-1]; |
227 if (remote_send_progress[i] > min_progress) { | 265 if (remote_send_progress[i] > min_progress) { |
228 min_progress = remote_send_progress[i]; | 266 min_progress = remote_send_progress[i]; |
229 } | 267 } |
230 } | 268 } |
231 } | 269 } |
232 if (min_progress == buffer.size) { | 270 if (min_progress == output_stream.next_out) { |
233 buffer.size = 0; | 271 output_stream.next_out = compressed; |
234 memset(remote_send_progress, 0, sizeof(remote_send_progress)); | 272 output_stream.avail_out = compressed_storage; |
235 multi_count = 0; | 273 for (int i = 0; i < num_remotes; i++) { |
236 last_event_type = 0xFF; | 274 remote_send_progress[i] = compressed; |
275 } | |
237 } | 276 } |
238 } | 277 } |
239 | 278 |
240 void event_log(uint8_t type, uint32_t cycle, uint8_t size, uint8_t *payload) | 279 void event_log(uint8_t type, uint32_t cycle, uint8_t size, uint8_t *payload) |
241 { | 280 { |
243 return; | 282 return; |
244 } | 283 } |
245 event_header(type, cycle); | 284 event_header(type, cycle); |
246 last = cycle; | 285 last = cycle; |
247 save_buffer8(&buffer, payload, size); | 286 save_buffer8(&buffer, payload, size); |
248 if (listen_sock && buffer.size > 1280) { | 287 if (!multi_count) { |
249 if (multi_count) { | 288 last_event_type = 0xFF; |
250 buffer.data[multi_start] |= multi_count - 2; | 289 output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data); |
251 multi_count = 0; | 290 int result = deflate(&output_stream, Z_NO_FLUSH); |
252 last_event_type = 0xFF; | 291 if (result != Z_OK) { |
253 } | 292 fatal_error("deflate returned %d\n", result); |
254 flush_socket(); | 293 } |
294 if (listen_sock) { | |
295 if ((output_stream.next_out - compressed) > 1280 || !output_stream.avail_out) { | |
296 flush_socket(); | |
297 } | |
298 } else if (!output_stream.avail_out) { | |
299 fwrite(compressed, 1, compressed_storage, event_file); | |
300 output_stream.next_out = compressed; | |
301 output_stream.avail_out = compressed_storage; | |
302 } | |
303 if (!output_stream.avail_in) { | |
304 buffer.size = 0; | |
305 output_stream.next_in = buffer.data; | |
306 } | |
255 } | 307 } |
256 } | 308 } |
257 | 309 |
258 static uint32_t last_word_address; | 310 static uint32_t last_word_address; |
259 void event_vram_word(uint32_t cycle, uint32_t address, uint16_t value) | 311 void event_vram_word(uint32_t cycle, uint32_t address, uint16_t value) |
298 } | 350 } |
299 } | 351 } |
300 return total; | 352 return total; |
301 } | 353 } |
302 | 354 |
355 void deflate_flush(uint8_t full) | |
356 { | |
357 output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data); | |
358 while (output_stream.avail_in) | |
359 { | |
360 if (!output_stream.avail_out) { | |
361 size_t old_storage = compressed_storage; | |
362 uint8_t *old_compressed = compressed; | |
363 compressed_storage *= 2; | |
364 compressed = realloc(compressed, compressed_storage); | |
365 output_stream.next_out = compressed + old_storage; | |
366 output_stream.avail_out = old_storage; | |
367 for (int i = 0; i < num_remotes; i++) { | |
368 if (!remote_needs_state[i]) { | |
369 remote_send_progress[i] = compressed + (remote_send_progress[i] - old_compressed); | |
370 } | |
371 } | |
372 } | |
373 int result = deflate(&output_stream, full ? Z_FINISH : Z_SYNC_FLUSH); | |
374 if (result != (full ? Z_STREAM_END : Z_OK)) { | |
375 fatal_error("deflate returned %d\n", result); | |
376 } | |
377 if (full) { | |
378 result = deflateReset(&output_stream); | |
379 if (result != Z_OK) { | |
380 fatal_error("deflateReset returned %d\n", result); | |
381 } | |
382 } | |
383 } | |
384 output_stream.next_in = buffer.data; | |
385 buffer.size = 0; | |
386 } | |
387 | |
303 void event_state(uint32_t cycle, serialize_buffer *state) | 388 void event_state(uint32_t cycle, serialize_buffer *state) |
304 { | 389 { |
305 if (!fully_active) { | 390 if (!fully_active) { |
306 last = cycle; | 391 last = cycle; |
307 } | 392 } |
309 EVENT_STATE << 4, last >> 24, last >> 16, last >> 8, last, | 394 EVENT_STATE << 4, last >> 24, last >> 16, last >> 8, last, |
310 last_word_address >> 16, last_word_address >> 8, last_word_address, | 395 last_word_address >> 16, last_word_address >> 8, last_word_address, |
311 last_byte_address >> 8, last_byte_address, | 396 last_byte_address >> 8, last_byte_address, |
312 state->size >> 16, state->size >> 8, state->size | 397 state->size >> 16, state->size >> 8, state->size |
313 }; | 398 }; |
399 uint8_t sent_system_start = 0; | |
314 for (int i = 0; i < num_remotes; i++) | 400 for (int i = 0; i < num_remotes; i++) |
315 { | 401 { |
316 if (remote_needs_state[i]) { | 402 if (remote_needs_state[i]) { |
317 if( | 403 if (send_all(remotes[i], system_start, system_start_size, 0) == system_start_size) { |
318 send_all(remotes[i], system_start, system_start_size, 0) == system_start_size | 404 sent_system_start = 1; |
319 && send_all(remotes[i], header, sizeof(header), 0) == sizeof(header) | |
320 && send_all(remotes[i], state->data, state->size, 0) == state->size | |
321 ) { | |
322 remote_send_progress[i] = buffer.size; | |
323 remote_needs_state[i] = 0; | |
324 socket_blocking(remotes[i], 0); | |
325 int flag = 1; | |
326 setsockopt(remotes[i], IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag)); | |
327 fully_active = 1; | |
328 } else { | 405 } else { |
329 socket_close(remotes[i]); | 406 socket_close(remotes[i]); |
330 remotes[i] = remotes[num_remotes-1]; | 407 remotes[i] = remotes[num_remotes-1]; |
331 remote_send_progress[i] = remote_send_progress[num_remotes-1]; | 408 remote_send_progress[i] = remote_send_progress[num_remotes-1]; |
332 remote_needs_state[i] = remote_needs_state[num_remotes-1]; | 409 remote_needs_state[i] = remote_needs_state[num_remotes-1]; |
333 num_remotes--; | 410 num_remotes--; |
334 i--; | 411 i--; |
335 } | 412 } |
336 } | 413 } |
337 } | 414 } |
415 if (sent_system_start) { | |
416 if (fully_active) { | |
417 if (multi_count) { | |
418 finish_multi(); | |
419 } | |
420 //full flush is needed so new and old clients can share a stream | |
421 deflate_flush(1); | |
422 } | |
423 save_buffer8(&buffer, header, sizeof(header)); | |
424 save_buffer8(&buffer, state->data, state->size); | |
425 size_t old_compressed_size = output_stream.next_out - compressed; | |
426 deflate_flush(1); | |
427 size_t state_size = output_stream.next_out - compressed - old_compressed_size; | |
428 for (int i = 0; i < num_remotes; i++) { | |
429 if (remote_needs_state[i]) { | |
430 if (send_all(remotes[i], compressed + old_compressed_size, state_size, 0) == state_size) { | |
431 remote_send_progress[i] = compressed + old_compressed_size; | |
432 remote_needs_state[i] = 0; | |
433 socket_blocking(remotes[i], 0); | |
434 int flag = 1; | |
435 setsockopt(remotes[i], IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag)); | |
436 fully_active = 1; | |
437 } else { | |
438 socket_close(remotes[i]); | |
439 remotes[i] = remotes[num_remotes-1]; | |
440 remote_send_progress[i] = remote_send_progress[num_remotes-1]; | |
441 remote_needs_state[i] = remote_needs_state[num_remotes-1]; | |
442 num_remotes--; | |
443 i--; | |
444 } | |
445 } | |
446 } | |
447 output_stream.next_out = compressed + old_compressed_size; | |
448 output_stream.avail_out = compressed_storage - old_compressed_size; | |
449 } | |
338 } | 450 } |
339 | 451 |
340 void event_flush(uint32_t cycle) | 452 void event_flush(uint32_t cycle) |
341 { | 453 { |
342 if (!active) { | 454 if (!active) { |
343 return; | 455 return; |
344 } | 456 } |
345 if (fully_active) { | 457 if (fully_active) { |
346 event_log(EVENT_FLUSH, cycle, 0, NULL); | 458 event_header(EVENT_FLUSH, cycle); |
459 last = cycle; | |
460 | |
461 deflate_flush(0); | |
347 } | 462 } |
348 if (event_file) { | 463 if (event_file) { |
349 fwrite(buffer.data, 1, buffer.size, event_file); | 464 fwrite(compressed, 1, output_stream.next_out - compressed, event_file); |
350 fflush(event_file); | 465 fflush(event_file); |
351 buffer.size = 0; | 466 output_stream.next_out = compressed; |
352 multi_count = 0; | 467 output_stream.avail_out = compressed_storage; |
353 last_event_type = 0xFF; | |
354 } else if (listen_sock) { | 468 } else if (listen_sock) { |
355 flush_socket(); | 469 flush_socket(); |
356 } | 470 } |
471 } | |
472 | |
473 static void init_event_reader_common(event_reader *reader) | |
474 { | |
475 reader->last_cycle = 0; | |
476 reader->repeat_event = 0xFF; | |
477 reader->storage = 512 * 1024; | |
478 init_deserialize(&reader->buffer, malloc(reader->storage), reader->storage); | |
479 reader->buffer.size = 0; | |
480 memset(&reader->input_stream, 0, sizeof(reader->input_stream)); | |
481 | |
357 } | 482 } |
358 | 483 |
359 void init_event_reader(event_reader *reader, uint8_t *data, size_t size) | 484 void init_event_reader(event_reader *reader, uint8_t *data, size_t size) |
360 { | 485 { |
361 reader->socket = 0; | 486 reader->socket = 0; |
362 reader->last_cycle = 0; | 487 reader->last_cycle = 0; |
363 reader->repeat_event = 0xFF; | 488 reader->repeat_event = 0xFF; |
364 init_deserialize(&reader->buffer, data, size); | 489 init_event_reader_common(reader); |
490 uint8_t name_len = data[1]; | |
491 reader->buffer.size = name_len + 2; | |
492 memcpy(reader->buffer.data, data, reader->buffer.size); | |
493 reader->input_stream.next_in = data + reader->buffer.size; | |
494 reader->input_stream.avail_in = size - reader->buffer.size; | |
495 | |
496 int result = inflateInit(&reader->input_stream); | |
497 if (Z_OK != result) { | |
498 fatal_error("inflateInit returned %d\n", result); | |
499 } | |
500 reader->input_stream.next_out = reader->buffer.data + reader->buffer.size; | |
501 reader->input_stream.avail_out = reader->storage - reader->buffer.size; | |
502 result = inflate(&reader->input_stream, Z_NO_FLUSH); | |
503 if (Z_OK != result && Z_STREAM_END != result) { | |
504 fatal_error("inflate returned %d\n", result); | |
505 } | |
506 reader->buffer.size = reader->input_stream.next_out - reader->buffer.data; | |
365 } | 507 } |
366 | 508 |
367 void init_event_reader_tcp(event_reader *reader, char *address, char *port) | 509 void init_event_reader_tcp(event_reader *reader, char *address, char *port) |
368 { | 510 { |
369 struct addrinfo request, *result; | 511 struct addrinfo request, *result; |
380 } | 522 } |
381 if (connect(reader->socket, result->ai_addr, result->ai_addrlen) < 0) { | 523 if (connect(reader->socket, result->ai_addr, result->ai_addrlen) < 0) { |
382 fatal_error("Failed to connect to %s:%s for event log stream\n", address, port); | 524 fatal_error("Failed to connect to %s:%s for event log stream\n", address, port); |
383 } | 525 } |
384 | 526 |
385 reader->storage = 512 * 1024; | 527 init_event_reader_common(reader); |
386 reader->last_cycle = 0; | 528 reader->socket_buffer_size = 256 * 1024; |
387 init_deserialize(&reader->buffer, malloc(reader->storage), reader->storage); | 529 reader->socket_buffer = malloc(reader->socket_buffer_size); |
388 reader->buffer.size = 0; | 530 |
389 while(reader->buffer.size < 3 || reader->buffer.size < 3 + reader->buffer.data[2]) | 531 while(reader->buffer.size < 3 || reader->buffer.size < 3 + reader->buffer.data[2]) |
390 { | 532 { |
391 int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0); | 533 int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0); |
392 if (bytes < 0) { | 534 if (bytes < 0) { |
393 fatal_error("Failed to receive system init from %s:%s\n", address, port); | 535 fatal_error("Failed to receive system init from %s:%s\n", address, port); |
394 } | 536 } |
395 reader->buffer.size += bytes; | 537 reader->buffer.size += bytes; |
396 } | 538 } |
539 size_t init_msg_len = 3 + reader->buffer.data[2]; | |
540 memcpy(reader->socket_buffer, reader->buffer.data + init_msg_len, reader->buffer.size - init_msg_len); | |
541 reader->input_stream.next_in = reader->socket_buffer; | |
542 reader->input_stream.avail_in = reader->buffer.size - init_msg_len; | |
543 reader->buffer.size = init_msg_len; | |
544 int res = inflateInit(&reader->input_stream); | |
545 if (Z_OK != res) { | |
546 fatal_error("inflateInit returned %d\n", res); | |
547 } | |
548 reader->input_stream.next_out = reader->buffer.data + init_msg_len; | |
549 reader->input_stream.avail_out = reader->storage - init_msg_len; | |
550 res = inflate(&reader->input_stream, Z_NO_FLUSH); | |
551 if (Z_OK != res && Z_BUF_ERROR != res) { | |
552 fatal_error("inflate returned %d in init_event_reader_tcp\n", res); | |
553 } | |
397 socket_blocking(reader->socket, 0); | 554 socket_blocking(reader->socket, 0); |
398 int flag = 1; | 555 int flag = 1; |
399 setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag)); | 556 setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag)); |
400 } | 557 } |
401 | 558 |
402 static void read_from_socket(event_reader *reader) | 559 static void read_from_socket(event_reader *reader) |
403 { | 560 { |
404 if (reader->storage - (reader->buffer.size - reader->buffer.cur_pos) < 128 * 1024) { | 561 if (reader->socket_buffer_size - reader->input_stream.avail_in < 128 * 1024) { |
405 reader->storage *= 2; | 562 reader->socket_buffer_size *= 2; |
406 uint8_t *new_buf = malloc(reader->storage); | 563 uint8_t *new_buf = malloc(reader->socket_buffer_size); |
407 memcpy(new_buf, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos); | 564 memcpy(new_buf, reader->input_stream.next_in, reader->input_stream.avail_in); |
408 free(reader->buffer.data); | 565 free(reader->socket_buffer); |
409 reader->buffer.data = new_buf; | 566 reader->socket_buffer = new_buf; |
410 reader->buffer.size -= reader->buffer.cur_pos; | 567 reader->input_stream.next_in = new_buf; |
411 reader->buffer.cur_pos = 0; | 568 } else if ( |
412 } else if (reader->buffer.cur_pos >= reader->buffer.size/2 && reader->buffer.size >= reader->storage/2) { | 569 reader->input_stream.next_in - reader->socket_buffer >= reader->input_stream.avail_in |
570 && reader->input_stream.next_in - reader->socket_buffer + reader->input_stream.avail_in >= reader->socket_buffer_size/2 | |
571 ) { | |
572 memmove(reader->socket_buffer, reader->input_stream.next_in, reader->input_stream.avail_in); | |
573 reader->input_stream.next_in = reader->socket_buffer; | |
574 } | |
575 uint8_t *space_start = reader->input_stream.next_in + reader->input_stream.avail_in; | |
576 size_t space = (reader->socket_buffer + reader->socket_buffer_size) - space_start; | |
577 int bytes = recv(reader->socket, space_start, space, 0); | |
578 if (bytes >= 0) { | |
579 reader->input_stream.avail_in += bytes; | |
580 } else if (!socket_error_is_wouldblock()) { | |
581 fatal_error("Connection closed, error = %X\n", socket_last_error()); | |
582 } | |
583 } | |
584 | |
585 static void inflate_flush(event_reader *reader) | |
586 { | |
587 if (reader->buffer.cur_pos > reader->storage / 2) { | |
413 memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos); | 588 memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos); |
414 reader->buffer.size -= reader->buffer.cur_pos; | 589 reader->buffer.size -= reader->buffer.cur_pos; |
415 reader->buffer.cur_pos = 0; | 590 reader->buffer.cur_pos = 0; |
416 } | 591 reader->input_stream.next_out = reader->buffer.data + reader->buffer.size; |
417 int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0); | 592 reader->input_stream.avail_out = reader->storage - reader->buffer.size; |
418 if (bytes >= 0) { | 593 } |
419 reader->buffer.size += bytes; | 594 int result = inflate(&reader->input_stream, Z_SYNC_FLUSH); |
420 } else if (!socket_error_is_wouldblock()) { | 595 if (Z_OK != result && Z_STREAM_END != result) { |
421 fatal_error("Connection closed, error = %X\n", socket_last_error()); | 596 fatal_error("inflate returned %d\n", result); |
422 } | 597 } |
598 reader->buffer.size = reader->input_stream.next_out - reader->buffer.data; | |
599 if (result == Z_STREAM_END && (reader->socket || reader->input_stream.avail_in)) { | |
600 inflateReset(&reader->input_stream); | |
601 if (reader->input_stream.avail_in) { | |
602 inflate_flush(reader); | |
603 } | |
604 } | |
605 | |
423 } | 606 } |
424 | 607 |
425 void reader_ensure_data(event_reader *reader, size_t bytes) | 608 void reader_ensure_data(event_reader *reader, size_t bytes) |
426 { | 609 { |
427 if (reader->socket && reader->buffer.size - reader->buffer.cur_pos < bytes) { | 610 if (reader->buffer.size - reader->buffer.cur_pos < bytes) { |
428 socket_blocking(reader->socket, 1); | 611 if (reader->socket) { |
429 while (reader->buffer.size - reader->buffer.cur_pos < bytes) { | |
430 read_from_socket(reader); | 612 read_from_socket(reader); |
431 } | 613 } |
432 socket_blocking(reader->socket, 0); | 614 if (reader->input_stream.avail_in) { |
615 inflate_flush(reader); | |
616 } | |
617 if (reader->socket && reader->buffer.size - reader->buffer.cur_pos < bytes) { | |
618 socket_blocking(reader->socket, 1); | |
619 while (reader->buffer.size - reader->buffer.cur_pos < bytes) { | |
620 read_from_socket(reader); | |
621 inflate_flush(reader); | |
622 } | |
623 socket_blocking(reader->socket, 0); | |
624 } | |
433 } | 625 } |
434 } | 626 } |
435 | 627 |
436 uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out) | 628 uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out) |
437 { | 629 { |
439 reader->repeat_remaining--; | 631 reader->repeat_remaining--; |
440 *cycle_out = reader->last_cycle + reader->repeat_delta; | 632 *cycle_out = reader->last_cycle + reader->repeat_delta; |
441 reader->last_cycle = *cycle_out; | 633 reader->last_cycle = *cycle_out; |
442 return reader->repeat_event; | 634 return reader->repeat_event; |
443 } | 635 } |
444 if (reader->socket) { | 636 reader_ensure_data(reader, 1); |
445 read_from_socket(reader); | |
446 reader_ensure_data(reader, 1); | |
447 } | |
448 uint8_t header = load_int8(&reader->buffer); | 637 uint8_t header = load_int8(&reader->buffer); |
449 uint8_t ret; | 638 uint8_t ret; |
450 uint32_t delta; | 639 uint32_t delta; |
451 uint8_t multi_start = 0; | 640 uint8_t multi_start = 0; |
452 if ((header & 0xF0) == (EVENT_MULTI << 4)) { | 641 if ((header & 0xF0) == (EVENT_MULTI << 4)) { |