@@ -26,7 +26,7 @@ JsonStream::~JsonStream() {
2626#endif
2727}
2828
29-
29+ /* // this implementation is untested and unlikely to work
3030void JsonStream::set_new_buffer(const char *buf, size_t len) {
3131#ifdef SIMDJSON_THREADS_ENABLED
3232 if(stage_1_thread.joinable()) {
@@ -35,41 +35,40 @@ void JsonStream::set_new_buffer(const char *buf, size_t len) {
3535#endif
3636 this->_buf = buf;
3737 this->_len = len;
38- _batch_size = 0 ;
39- _batch_size = 0 ;
38+ _batch_size = 0; // why zero?
39+ _batch_size = 0; // waat??
4040 next_json = 0;
4141 current_buffer_loc = 0;
4242 n_parsed_docs = 0;
43- error_on_last_attempt= false ;
4443 load_next_batch = true;
45- }
44+ }*/
4645
47- // todo: this code is too complicated, it should be greatly simplified
46+
47+ #ifdef SIMDJSON_THREADS_ENABLED
48+
49+ // threaded version of json_parse
50+ // todo: simplify this code further
4851int JsonStream::json_parse (ParsedJson &pj) {
49- if (pj.byte_capacity == 0 ) {
52+ if (unlikely ( pj.byte_capacity == 0 ) ) {
5053 const bool allocok = pj.allocate_capacity (_batch_size);
51- const bool allocok_thread = pj_thread.allocate_capacity (_batch_size);
52- if (!allocok || !allocok_thread) {
53- std::cerr << " can't allocate memory" << std::endl;
54- return false ;
54+ if (!allocok) {
55+ pj.error_code = simdjson::MEMALLOC;
56+ return pj.error_code ;
5557 }
58+ } else if (unlikely (pj.byte_capacity < _batch_size)) {
59+ pj.error_code = simdjson::CAPACITY;
60+ return pj.error_code ;
5661 }
57- else if (pj .byte_capacity < _batch_size) {
58- return simdjson::CAPACITY ;
59- }
60- # ifdef SIMDJSON_THREADS_ENABLED
61- if (current_buffer_loc == last_json_buffer_loc) {
62- load_next_batch = true ;
62+ if ( unlikely (pj_thread .byte_capacity < _batch_size) ) {
63+ const bool allocok_thread = pj_thread. allocate_capacity (_batch_size) ;
64+ if (!allocok_thread) {
65+ pj. error_code = simdjson::MEMALLOC;
66+ return pj. error_code ;
67+ }
6368 }
64- #endif
65-
66- if (load_next_batch) {
67- #ifdef SIMDJSON_THREADS_ENABLED
69+ if (unlikely (load_next_batch)) {
6870 // First time loading
6971 if (!stage_1_thread.joinable ()) {
70- _buf = _buf + current_buffer_loc;
71- _len -= current_buffer_loc;
72- n_bytes_parsed += current_buffer_loc;
7372 _batch_size = std::min (_batch_size, _len);
7473 _batch_size = trimmed_length_safe_utf8 ((const char *)_buf, _batch_size);
7574 if (_batch_size == 0 ) {
@@ -100,8 +99,8 @@ int JsonStream::json_parse(ParsedJson &pj) {
10099 _buf = _buf + last_json_buffer_loc;
101100 _len -= last_json_buffer_loc;
102101 n_bytes_parsed += last_json_buffer_loc;
103- last_json_buffer_loc = 0 ; // because we want to use it in the if above.
104102 }
103+ // let us decide whether we will start a new thread
105104 if (_len - _batch_size > 0 ) {
106105 last_json_buffer_loc = pj.structural_indexes [find_last_json_buf_idx (_buf,_batch_size,pj)];
107106 _batch_size = std::min (_batch_size, _len - last_json_buffer_loc);
@@ -122,15 +121,43 @@ int JsonStream::json_parse(ParsedJson &pj) {
122121 });
123122 }
124123 }
124+ next_json = 0 ;
125+ load_next_batch = false ;
126+ } // load_next_batch
127+ int res = best_stage2 (_buf, _len, pj, next_json);
128+ if (res == simdjson::SUCCESS_AND_HAS_MORE) {
129+ n_parsed_docs++;
130+ current_buffer_loc = pj.structural_indexes [next_json];
131+ load_next_batch = (current_buffer_loc == last_json_buffer_loc);
132+ } else if (res == simdjson::SUCCESS) {
133+ n_parsed_docs++;
134+ if (_len > _batch_size) {
135+ current_buffer_loc = pj.structural_indexes [next_json - 1 ];
136+ load_next_batch = true ;
137+ res = simdjson::SUCCESS_AND_HAS_MORE;
138+ }
139+ }
140+ return res;
141+ }
142+
143+ #else // SIMDJSON_THREADS_ENABLED
125144
126- // If we loaded a perfect amount of documents last time, we need to skip the first element,
127- // because it represents the end of the last document
128- next_json = next_json == 1 ;
129- #else
145+ // single-threaded version of json_parse
146+ int JsonStream::json_parse (ParsedJson &pj) {
147+ if (unlikely (pj.byte_capacity == 0 )) {
148+ const bool allocok = pj.allocate_capacity (_batch_size);
149+ if (!allocok) {
150+ pj.error_code = simdjson::MEMALLOC;
151+ return pj.error_code ;
152+ }
153+ } else if (unlikely (pj.byte_capacity < _batch_size)) {
154+ pj.error_code = simdjson::CAPACITY;
155+ return pj.error_code ;
156+ }
157+ if (unlikely (load_next_batch)) {
130158 _buf = _buf + current_buffer_loc;
131159 _len -= current_buffer_loc;
132160 n_bytes_parsed += current_buffer_loc;
133-
134161 _batch_size = std::min (_batch_size, _len);
135162 _batch_size = trimmed_length_safe_utf8 ((const char *)_buf, _batch_size);
136163 int stage1_is_ok = best_stage1 (_buf, _batch_size, pj, true );
@@ -144,51 +171,27 @@ int JsonStream::json_parse(ParsedJson &pj) {
144171 return pj.error_code ;
145172 }
146173 pj.n_structural_indexes = last_index + 1 ;
147- #endif
148174 load_next_batch = false ;
149-
150- }
151- // #define SIMDJSON_IREALLYNEEDHELP
152- #ifdef SIMDJSON_IREALLYNEEDHELP // for debugging
153- size_t oldnext_json = next_json;
154- #endif
175+ } // load_next_batch
155176 int res = best_stage2 (_buf, _len, pj, next_json);
156- #ifdef SIMDJSON_IREALLYNEEDHELP // for debugging
157- int sizeofdoc = pj.structural_indexes [next_json]-pj.structural_indexes [oldnext_json];
158- printf (" size = %d\n " , sizeofdoc);
159- if (sizeofdoc > 0 ) {
160- printf (" %.*s\n " ,sizeofdoc, _buf + pj.structural_indexes [oldnext_json]);
161- } else {
162- printf (" <empty>\n " );
163- }
164- #endif
165-
166- if (res == simdjson::SUCCESS_AND_HAS_MORE) {
167- error_on_last_attempt = false ;
177+ if (likely (res == simdjson::SUCCESS_AND_HAS_MORE)) {
168178 n_parsed_docs++;
169179 current_buffer_loc = pj.structural_indexes [next_json];
170180 } else if (res == simdjson::SUCCESS) {
171- error_on_last_attempt = false ;
172181 n_parsed_docs++;
173182 if (_len > _batch_size) {
174183 current_buffer_loc = pj.structural_indexes [next_json - 1 ];
175- #ifndef SIMDJSON_THREADS_ENABLED
176184 next_json = 1 ;
177- #endif
178185 load_next_batch = true ;
179186 res = simdjson::SUCCESS_AND_HAS_MORE;
180187 }
181188 }
182- // We assume the error is because the json was not loaded completely in this batch.
183- // Load a new batch and if the error persists, it's a genuine error.
184- else if (!error_on_last_attempt) {
185- load_next_batch = true ;
186- error_on_last_attempt = true ;
187- res = json_parse (pj);
188- }
189189 return res;
190190}
191191
192+ #endif // SIMDJSON_THREADS_ENABLED
193+
194+
192195size_t JsonStream::get_current_buffer_loc () const {
193196 return current_buffer_loc;
194197}
0 commit comments