Skip to content

Commit 17c6a67

Browse files
committed
Introduce node.stdio
Remove old stdout, stderr, stdin objects.
1 parent 0727fcc commit 17c6a67

13 files changed

Lines changed: 793 additions & 38 deletions

File tree

deps/coupling/coupling.c

Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
/* Copyright (c) 2009 Ryan Dahl (ry@tinyclouds.org)
2+
*
3+
* All rights reserved.
4+
*
5+
* Permission is hereby granted, free of charge, to any person obtaining
6+
* a copy of this software and associated documentation files (the
7+
* "Software"), to deal in the Software without restriction, including
8+
* without limitation the rights to use, copy, modify, merge, publish,
9+
* distribute, sublicense, and/or sell copies of the Software, and to
10+
* permit persons to whom the Software is furnished to do so, subject to
11+
* the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be
14+
* included in all copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17+
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18+
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19+
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20+
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21+
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22+
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23+
*/
24+
#include "coupling.h"
25+
26+
#include <sys/types.h>
27+
#include <sys/uio.h>
28+
#include <sys/select.h>
29+
#include <stdlib.h>
30+
#include <stdio.h>
31+
#include <unistd.h>
32+
#include <errno.h>
33+
#include <fcntl.h>
34+
#include <assert.h>
35+
36+
#include <pthread.h>
37+
38+
#ifdef PIPE_BUF
39+
# define BUFSIZE PIPE_BUF
40+
#else
41+
# define BUFSIZE 4096
42+
#endif
43+
44+
#define MAX(a,b) ((a) > (b) ? (a) : (b))
45+
46+
// ring buffer
47+
typedef struct {
48+
int head;
49+
int tail;
50+
int size;
51+
char buf[BUFSIZE];
52+
} ring_buffer;
53+
54+
static inline void
55+
ring_buffer_inspect (ring_buffer *ring)
56+
{
57+
printf("size %5d head %5d tail %5d\n", ring->size, ring->head, ring->tail);
58+
}
59+
60+
static inline void
61+
ring_buffer_init (ring_buffer *ring)
62+
{
63+
ring->head = 0;
64+
ring->tail = 0;
65+
ring->size = 0;
66+
}
67+
68+
static inline int
69+
ring_buffer_filled_p (ring_buffer *ring)
70+
{
71+
assert(BUFSIZE - (long)ring->size >= 0);
72+
return (BUFSIZE == ring->size);
73+
}
74+
75+
static inline int
76+
ring_buffer_empty_p (ring_buffer *ring)
77+
{
78+
return 0 == ring->size;
79+
}
80+
81+
static ssize_t
82+
ring_buffer_pull (ring_buffer *ring, int fd)
83+
{
84+
// DO NOT CALL WHEN FILLED
85+
assert(!ring_buffer_filled_p(ring));
86+
87+
struct iovec iov[2];
88+
int iovcnt = 1;
89+
90+
// Very tough logic. Can you follow? Barely can I.
91+
iov[0].iov_base = ring->buf + ring->tail;
92+
if (ring->tail < ring->head) {
93+
iov[0].iov_len = ring->head - ring->tail;
94+
} else {
95+
iov[0].iov_len = BUFSIZE - ring->tail;
96+
if (ring->head != 0) {
97+
iovcnt = 2;
98+
iov[1].iov_base = ring->buf;
99+
iov[1].iov_len = ring->head;
100+
}
101+
}
102+
103+
int r = readv(fd, iov, iovcnt);
104+
105+
if (r > 0) {
106+
ring->size += r;
107+
ring->tail = (ring->tail + r) % BUFSIZE;
108+
}
109+
assert(ring->size <= BUFSIZE);
110+
111+
return r;
112+
}
113+
114+
static ssize_t
115+
ring_buffer_push (ring_buffer *ring, int fd)
116+
{
117+
// DO NOT CALL WHEN EMPTY
118+
assert(!ring_buffer_empty_p(ring));
119+
120+
struct iovec iov[2];
121+
int iovcnt = 1;
122+
123+
iov[0].iov_base = ring->buf + ring->head;
124+
if (ring->head < ring->tail) {
125+
iov[0].iov_len = ring->tail - ring->head;
126+
} else {
127+
iov[0].iov_len = BUFSIZE - ring->head;
128+
if (ring->tail != 0) {
129+
iovcnt = 2;
130+
iov[1].iov_base = ring->buf;
131+
iov[1].iov_len = ring->tail;
132+
}
133+
}
134+
135+
int r = writev(fd, iov, iovcnt);
136+
137+
if (r > 0) {
138+
ring->size -= r;
139+
ring->head = (ring->head + r) % BUFSIZE;
140+
}
141+
assert(0 <= (long)ring->size);
142+
143+
return r;
144+
}
145+
146+
147+
static void
148+
pump (int pullfd, int pushfd)
149+
{
150+
int r;
151+
ring_buffer ring;
152+
fd_set readfds, writefds, exceptfds;
153+
154+
ring_buffer_init(&ring);
155+
156+
int maxfd;
157+
158+
while (pushfd >= 0 && (pullfd >= 0 || !ring_buffer_empty_p(&ring))) {
159+
FD_ZERO(&exceptfds);
160+
FD_ZERO(&readfds);
161+
FD_ZERO(&writefds);
162+
163+
maxfd = pushfd;
164+
FD_SET(pushfd, &exceptfds);
165+
166+
if (pullfd >= 0) {
167+
FD_SET(pullfd, &exceptfds);
168+
maxfd = MAX(pushfd, pullfd);
169+
if (!ring_buffer_filled_p(&ring)) FD_SET(pullfd, &readfds);
170+
}
171+
172+
if (!ring_buffer_empty_p(&ring)) {
173+
FD_SET(pushfd, &writefds);
174+
}
175+
176+
r = select(maxfd+1, &readfds, &writefds, &exceptfds, NULL);
177+
178+
if (r < 0 || FD_ISSET(pushfd, &exceptfds)) {
179+
pushfd = pullfd = -1;
180+
return;
181+
}
182+
183+
if (pullfd >= 0 && FD_ISSET(pullfd, &exceptfds)) {
184+
pullfd = -1;
185+
}
186+
187+
if (pullfd >= 0 && FD_ISSET(pullfd, &readfds)) {
188+
r = ring_buffer_pull(&ring, pullfd);
189+
if (r == 0) {
190+
/* eof */
191+
pullfd = -1;
192+
193+
} else if (r < 0) {
194+
if (errno != EINTR && errno != EAGAIN) goto error;
195+
}
196+
}
197+
198+
if (FD_ISSET(pushfd, &writefds)) {
199+
r = ring_buffer_push(&ring, pushfd);
200+
if (r < 0) {
201+
switch (errno) {
202+
case EINTR:
203+
case EAGAIN:
204+
continue;
205+
206+
case EPIPE:
207+
/* TODO catch SIGPIPE? */
208+
pushfd = pullfd = -1;
209+
return;
210+
211+
default:
212+
goto error;
213+
}
214+
}
215+
}
216+
}
217+
218+
return;
219+
220+
error:
221+
perror("(coupling) pump");
222+
}
223+
224+
static inline int
225+
set_nonblock (int fd)
226+
{
227+
int flags = fcntl(fd, F_GETFL, 0);
228+
if (flags == -1) return -1;
229+
230+
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
231+
if (r == -1) return -1;
232+
233+
return 0;
234+
}
235+
236+
struct coupling {
237+
int is_pull;
238+
int pullfd;
239+
int pushfd;
240+
int exposedfd;
241+
pthread_t tid;
242+
};
243+
244+
static void *
245+
pump_thread (void *data)
246+
{
247+
struct coupling *c = (struct coupling*)data;
248+
249+
pump(c->pullfd, c->pushfd);
250+
251+
return NULL;
252+
}
253+
254+
static struct coupling*
255+
create_coupling (int fd, int is_pull)
256+
{
257+
int pipefd[2];
258+
259+
struct coupling *c = malloc(sizeof(struct coupling));
260+
if (!c) return NULL;
261+
262+
int r = pipe(pipefd);
263+
if (r < 0) return NULL;
264+
265+
r = set_nonblock(pipefd[0]);
266+
if (r < 0) return NULL;
267+
assert(pipefd[0] >= 0);
268+
269+
r = set_nonblock(pipefd[1]);
270+
if (r < 0) return NULL;
271+
assert(pipefd[1] >= 0);
272+
273+
if (is_pull) {
274+
c->is_pull = 1;
275+
c->pullfd = fd;
276+
c->pushfd = pipefd[1];
277+
c->exposedfd = pipefd[0];
278+
} else {
279+
c->is_pull = 0;
280+
c->pushfd = fd;
281+
c->pullfd = pipefd[0];
282+
c->exposedfd = pipefd[1];
283+
}
284+
285+
r = pthread_create(&c->tid, NULL, pump_thread, c);
286+
if (r < 0) return NULL;
287+
288+
return c;
289+
}
290+
291+
struct coupling*
292+
coupling_new_pull (int fd)
293+
{
294+
return create_coupling(fd, 1);
295+
}
296+
297+
struct coupling*
298+
coupling_new_push (int fd)
299+
{
300+
return create_coupling(fd, 0);
301+
}
302+
303+
int
304+
coupling_nonblocking_fd (struct coupling *c)
305+
{
306+
return c->exposedfd;
307+
}
308+
309+
void
310+
coupling_join (struct coupling *c)
311+
{
312+
int r = pthread_join(c->tid, NULL);
313+
assert(r == 0);
314+
}
315+
316+
void
317+
coupling_destroy (struct coupling *c)
318+
{
319+
close(c->is_pull ? c->pushfd : c->pullfd);
320+
free(c);
321+
}

deps/coupling/coupling.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/* Copyright (c) 2009 Ryan Dahl (ry@tinyclouds.org)
2+
*
3+
* All rights reserved.
4+
*
5+
* Permission is hereby granted, free of charge, to any person obtaining
6+
* a copy of this software and associated documentation files (the
7+
* "Software"), to deal in the Software without restriction, including
8+
* without limitation the rights to use, copy, modify, merge, publish,
9+
* distribute, sublicense, and/or sell copies of the Software, and to
10+
* permit persons to whom the Software is furnished to do so, subject to
11+
* the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be
14+
* included in all copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17+
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18+
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19+
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20+
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21+
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22+
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23+
*/
24+
#ifndef coupling_h
25+
#define coupling_h
26+
#ifdef __cplusplus
27+
extern "C" {
28+
#endif
29+
30+
struct coupling;
31+
32+
struct coupling* coupling_new_pull (int fd);
33+
struct coupling* coupling_new_push (int fd);
34+
int coupling_nonblocking_fd (struct coupling*);
35+
void coupling_join (struct coupling*);
36+
void coupling_destroy (struct coupling*);
37+
38+
39+
#ifdef __cplusplus
40+
}
41+
#endif
42+
#endif

0 commit comments

Comments
 (0)