@@ -51,7 +51,7 @@ const $readCSV = async (filePath: string, options?: CsvInputOptionsNode): Promis
5151 const frameConfig = options ?. frameConfig || { }
5252
5353 if ( filePath . startsWith ( "http" ) || filePath . startsWith ( "https" ) ) {
54- return new Promise ( resolve => {
54+ return new Promise ( ( resolve , reject ) => {
5555 const optionsWithDefaults = {
5656 header : true ,
5757 dynamicTyping : true ,
@@ -60,6 +60,13 @@ const $readCSV = async (filePath: string, options?: CsvInputOptionsNode): Promis
6060 }
6161
6262 const dataStream = request . get ( filePath ) ;
63+ // reject any non-2xx status codes
64+ dataStream . on ( 'response' , ( response : any ) => {
65+ if ( response . statusCode < 200 || response . statusCode >= 300 ) {
66+ reject ( new Error ( `HTTP ${ response . statusCode } : ${ response . statusMessage } ` ) ) ;
67+ }
68+ } ) ;
69+
6370 const parseStream : any = Papa . parse ( Papa . NODE_STREAM_INPUT , optionsWithDefaults as any ) ;
6471 dataStream . pipe ( parseStream ) ;
6572
@@ -74,17 +81,24 @@ const $readCSV = async (filePath: string, options?: CsvInputOptionsNode): Promis
7481 } ) ;
7582
7683 } else {
77- return new Promise ( resolve => {
78- const fileStream = fs . createReadStream ( filePath )
79- Papa . parse ( fileStream , {
80- header : true ,
81- dynamicTyping : true ,
82- ...options ,
83- complete : results => {
84- const df = new DataFrame ( results . data , frameConfig ) ;
85- resolve ( df ) ;
84+ return new Promise ( ( resolve , reject ) => {
85+ fs . access ( filePath , fs . constants . F_OK , ( err ) => {
86+ if ( err ) {
87+ reject ( "ENOENT: no such file or directory" ) ;
8688 }
87- } ) ;
89+
90+ const fileStream = fs . createReadStream ( filePath )
91+
92+ Papa . parse ( fileStream , {
93+ header : true ,
94+ dynamicTyping : true ,
95+ ...options ,
96+ complete : results => {
97+ const df = new DataFrame ( results . data , frameConfig ) ;
98+ resolve ( df ) ;
99+ }
100+ } ) ;
101+ } )
88102 } ) ;
89103 }
90104} ;
@@ -113,9 +127,17 @@ const $streamCSV = async (filePath: string, callback: (df: DataFrame) => void, o
113127 dynamicTyping : true ,
114128 ...options ,
115129 }
116- return new Promise ( resolve => {
130+ return new Promise ( ( resolve , reject ) => {
117131 let count = - 1
118132 const dataStream = request . get ( filePath ) ;
133+
134+ // reject any non-2xx status codes
135+ dataStream . on ( 'response' , ( response : any ) => {
136+ if ( response . statusCode < 200 || response . statusCode >= 300 ) {
137+ reject ( new Error ( `HTTP ${ response . statusCode } : ${ response . statusMessage } ` ) ) ;
138+ }
139+ } ) ;
140+
119141 const parseStream : any = Papa . parse ( Papa . NODE_STREAM_INPUT , optionsWithDefaults ) ;
120142 dataStream . pipe ( parseStream ) ;
121143
@@ -130,19 +152,26 @@ const $streamCSV = async (filePath: string, callback: (df: DataFrame) => void, o
130152
131153 } ) ;
132154 } else {
133- const fileStream = fs . createReadStream ( filePath )
134155
135- return new Promise ( resolve => {
136- let count = - 1
137- Papa . parse ( fileStream , {
138- header : true ,
139- dynamicTyping : true ,
140- ...options ,
141- step : results => {
142- const df = new DataFrame ( [ results . data ] , { ...frameConfig , index : [ count ++ ] } ) ;
143- callback ( df ) ;
144- } ,
145- complete : ( ) => resolve ( null )
156+ return new Promise ( ( resolve , reject ) => {
157+ fs . access ( filePath , fs . constants . F_OK , ( err ) => {
158+ if ( err ) {
159+ reject ( "ENOENT: no such file or directory" ) ;
160+ }
161+
162+ const fileStream = fs . createReadStream ( filePath )
163+
164+ let count = - 1
165+ Papa . parse ( fileStream , {
166+ header : true ,
167+ dynamicTyping : true ,
168+ ...options ,
169+ step : results => {
170+ const df = new DataFrame ( [ results . data ] , { ...frameConfig , index : [ count ++ ] } ) ;
171+ callback ( df ) ;
172+ } ,
173+ complete : ( ) => resolve ( null )
174+ } ) ;
146175 } ) ;
147176 } ) ;
148177 }
@@ -228,6 +257,14 @@ const $openCsvInputStream = (filePath: string, options: CsvInputOptionsNode) =>
228257
229258 if ( filePath . startsWith ( "http" ) || filePath . startsWith ( "https" ) ) {
230259 const dataStream = request . get ( filePath ) ;
260+
261+ // reject any non-2xx status codes
262+ dataStream . on ( 'response' , ( response : any ) => {
263+ if ( response . statusCode < 200 || response . statusCode >= 300 ) {
264+ throw new Error ( `HTTP ${ response . statusCode } : ${ response . statusMessage } ` ) ;
265+ }
266+ } ) ;
267+
231268 const parseStream : any = Papa . parse ( Papa . NODE_STREAM_INPUT , { header, dynamicTyping : true , ...options } ) ;
232269 dataStream . pipe ( parseStream ) ;
233270 let count = - 1
@@ -258,37 +295,44 @@ const $openCsvInputStream = (filePath: string, options: CsvInputOptionsNode) =>
258295 return csvInputStream ;
259296 } else {
260297 const fileStream = fs . createReadStream ( filePath )
261- let count = - 1
262- Papa . parse ( fileStream , {
263- ...{ header, dynamicTyping : true , ...options } ,
264- step : results => {
265- if ( isFirstChunk ) {
266- if ( header === true ) {
267- ndFrameColumnNames = results . meta . fields || [ ]
268- } else {
269- ndFrameColumnNames = results . data
298+
299+ fs . access ( filePath , fs . constants . F_OK , ( err ) => {
300+ if ( err ) {
301+ throw new Error ( "ENOENT: no such file or directory" ) ;
302+ }
303+
304+ let count = - 1
305+ Papa . parse ( fileStream , {
306+ ...{ header, dynamicTyping : true , ...options } ,
307+ step : results => {
308+ if ( isFirstChunk ) {
309+ if ( header === true ) {
310+ ndFrameColumnNames = results . meta . fields || [ ]
311+ } else {
312+ ndFrameColumnNames = results . data
313+ }
314+ isFirstChunk = false
315+ return
270316 }
271- isFirstChunk = false
272- return
317+
318+ const df = new DataFrame ( [ results . data ] , {
319+ columns : ndFrameColumnNames ,
320+ index : [ count ++ ]
321+ } )
322+
323+ csvInputStream . push ( df ) ;
324+ } ,
325+ complete : ( result : any ) => {
326+ csvInputStream . push ( null ) ;
327+ return null
328+ } ,
329+ error : ( err ) => {
330+ csvInputStream . emit ( "error" , err ) ;
273331 }
332+ } ) ;
274333
275- const df = new DataFrame ( [ results . data ] , {
276- columns : ndFrameColumnNames ,
277- index : [ count ++ ]
278- } )
279-
280- csvInputStream . push ( df ) ;
281- } ,
282- complete : ( result : any ) => {
283- csvInputStream . push ( null ) ;
284- return null
285- } ,
286- error : ( err ) => {
287- csvInputStream . emit ( "error" , err ) ;
288- }
334+ return csvInputStream ;
289335 } ) ;
290-
291- return csvInputStream ;
292336 }
293337} ;
294338
@@ -314,39 +358,45 @@ const $openCsvInputStream = (filePath: string, options: CsvInputOptionsNode) =>
314358 * ```
315359 */
316360const $writeCsvOutputStream = ( filePath : string , options : CsvInputOptionsNode ) => {
317- let isFirstRow = true
318- const fileOutputStream = fs . createWriteStream ( filePath )
319- const csvOutputStream = new stream . Writable ( { objectMode : true } )
361+ fs . access ( filePath , fs . constants . F_OK , ( err ) => {
362+ if ( err ) {
363+ throw new Error ( "ENOENT: no such file or directory" ) ;
364+ }
365+
366+ let isFirstRow = true
367+ const fileOutputStream = fs . createWriteStream ( filePath )
368+ const csvOutputStream = new stream . Writable ( { objectMode : true } )
320369
321- csvOutputStream . _write = ( chunk : DataFrame | Series , encoding , callback ) => {
370+ csvOutputStream . _write = ( chunk : DataFrame | Series , encoding , callback ) => {
322371
323- if ( chunk instanceof DataFrame ) {
372+ if ( chunk instanceof DataFrame ) {
324373
325- if ( isFirstRow ) {
326- isFirstRow = false
327- fileOutputStream . write ( $toCSV ( chunk , { header : true , ...options } ) ) ;
374+ if ( isFirstRow ) {
375+ isFirstRow = false
376+ fileOutputStream . write ( $toCSV ( chunk , { header : true , ...options } ) ) ;
377+ callback ( ) ;
378+ } else {
379+ fileOutputStream . write ( $toCSV ( chunk , { header : false , ...options } ) ) ;
380+ callback ( ) ;
381+ }
382+
383+ } else if ( chunk instanceof Series ) {
384+
385+ fileOutputStream . write ( $toCSV ( chunk ) ) ;
328386 callback ( ) ;
387+
329388 } else {
330- fileOutputStream . write ( $toCSV ( chunk , { header : false , ...options } ) ) ;
331- callback ( ) ;
389+ csvOutputStream . emit ( "error" , new Error ( "ValueError: Intermediate chunk must be either a Series or DataFrame" ) )
332390 }
333391
334- } else if ( chunk instanceof Series ) {
335-
336- fileOutputStream . write ( $toCSV ( chunk ) ) ;
337- callback ( ) ;
338-
339- } else {
340- csvOutputStream . emit ( "error" , new Error ( "ValueError: Intermediate chunk must be either a Series or DataFrame" ) )
341392 }
342393
343- }
344-
345- csvOutputStream . on ( "finish" , ( ) => {
346- fileOutputStream . end ( )
347- } )
394+ csvOutputStream . on ( "finish" , ( ) => {
395+ fileOutputStream . end ( )
396+ } )
348397
349- return csvOutputStream
398+ return csvOutputStream
399+ } ) ;
350400}
351401
352402
@@ -358,4 +408,4 @@ export {
358408 $toCSV ,
359409 $writeCsvOutputStream ,
360410 $openCsvInputStream ,
361- }
411+ }
0 commit comments