forked from iMelki/Json2MySQL
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjsonToMySQL.js
More file actions
152 lines (130 loc) · 4.9 KB
/
jsonToMySQL.js
File metadata and controls
152 lines (130 loc) · 4.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Require modules:
const fs = require( "fs" );
const JSONStream = require( "JSONStream" );
const es = require('event-stream');
const debug = require('debug')('Main');
const path = require('path');
// Require Configuration settings:
const config = require('./config.js');
// Require DB Connection:
const db = require('./dbManipulator');
let first = true;
let i;
/**
* a handler for inputing config arguments
* @param argNum the index in the process.argv
* @param atrName the attribute name inside the config file
* @returns true if got input. false, otherwise.
*/
async function getConfigAtribute(argNum, atrName){
var ans = false;
if(process.argv[argNum] && process.argv[argNum].length>0){
config[atrName] = process.argv[argNum];
ans = true;
}else if(config[atrName] && config[atrName].length>0){
ans = true;
}
return ans;
}
// validate whether we got all the input needed to run the app
async function validateInput(){
debug('validating input..');
let i=2;
const gotJSONPath = await getConfigAtribute(i++, 'jsonPath');
if (gotJSONPath){
const gotDBName = await getConfigAtribute(i++, 'dbName');
if (gotDBName){
const gotTblName = await getConfigAtribute(i++, 'tblName');
if (gotTblName){
debug('Got all config input needed.');
}else{
throw new Error('you need to specify a JSON file path in config.js{jsonPath} or using the command\n "node index.js %file_path%"');
}
}else{
throw new Error('you need to specify a DB name in config.js{dbName} or using the command\n "node index.js %file_path% %DB_name%"');
}
}else{
throw new Error('you need to specify a JSON file path in config.js{jsonPath} or using the command\n "node index.js %file_path%"');
}
}
// create a stream to read & pass JS objs from the JSON file:
var getStream = function () {
try{
var stream = fs.createReadStream(config.jsonPath, {encoding: 'utf8'});
var parser = JSONStream.parse('*');
return stream.pipe(parser);
}catch(err){
throw new Error('JSON file corrupted or is not found!');
}
};
//start Streaming to get the first JSO and build the DB Table from its image
async function getTableFromFirstObject(){
return new Promise(function(res, rej){
debug('Streaming the first JSO from the input file to build a table..');
var j=0;
var firstStream = getStream();
firstStream.on('close', async () => { debug('bye First!');})
firstStream.pipe(es.mapSync(async function(obj) {
if (first){
//close. distroy, end
debug('ending firstStream..');
firstStream.end();
first = false;
debug('firstStream ended.');
var tableCreated = await db.buildTable(obj);
if (tableCreated) res();
else rej('Error creating table');
}else{
debug((j++)+' : how did you get through the first object?!');
}
}));
});
}
//start Streaming and deal with each JSO separately
async function startStreaming(){
return new Promise(function(res, rej){
debug('Streaming all JSOs from the input file..');
var secondStream = getStream();
secondStream.on('close', async () => {await finishApp(); })
secondStream.pipe(es.mapSync(async function (obj) {
try{
await db.insertToDB(obj);
}catch(err){
console.error(err.message);
}
}));
});
}
async function finishApp(){
debug('finished streaming. queing JSOs to be added to the DB..')
await db.endConnection();
console.log('\nThe job is done. all Documents have been moved to the new location, found at: \n');
console.log(config.host+', under database named "'+config.dbName+'", inside table named "'+config.tblName+'".\n');
debug('Good bye! :)');
}
// Main function:
exports.startScript = async function(dbName, mySqlUser, mySqlPass, rootPass){
try{
config.initDbParams(dbName, mySqlUser, mySqlPass, rootPass);
console.log('JSON2MySQL started. \nworking..');
//await validateInput();
//await db.init(config.host, config.user, config.password, config.dbName, config.tblName);
await db.init(config.host, config.user, config.password, config.dbName, config.tblName);
await db.runDatabase();
// dlt next row when app's rdy:
await db.dropTable(config.tblName);
////////
await getTableFromFirstObject();
await startStreaming();
}catch(err){
console.error(err.message);
await db.endConnection();
}
}
exports.getAllAccounts = async function() {
try{
return await db.getAllRecords();
}catch(err){
console.error(err);
}
}