@@ -49,7 +49,7 @@ create or replace type body ut_output_table_buffer is
4949 overriding member procedure send_line(self in ut_output_table_buffer, a_text varchar2, a_item_type varchar2 := null) is
5050 pragma autonomous_transaction;
5151 begin
52- if a_text is not null then
52+ if a_text is not null or a_item_type is not null then
5353 insert into ut_output_buffer_tmp(output_id, message_id, text, item_type)
5454 values (self.output_id, ut_message_id_seq.nextval, a_text, a_item_type);
5555 end if;
@@ -62,15 +62,15 @@ create or replace type body ut_output_table_buffer is
6262 insert into ut_output_buffer_tmp(output_id, message_id, text, item_type)
6363 select self.output_id, ut_message_id_seq.nextval, t.column_value, a_item_type
6464 from table(a_text_list) t
65- where t.column_value is not null;
65+ where t.column_value is not null or a_item_type is not null ;
6666
6767 commit;
6868 end;
6969
7070 overriding member procedure send_clob(self in ut_output_table_buffer, a_text clob, a_item_type varchar2 := null) is
7171 pragma autonomous_transaction;
7272 begin
73- if a_text is not null and a_text != empty_clob() then
73+ if a_text is not null and a_text != empty_clob() or a_item_type is not null then
7474 insert into ut_output_buffer_tmp(output_id, message_id, text, item_type)
7575 values (self.output_id, ut_message_id_seq.nextval, a_text, a_item_type);
7676 end if;
@@ -80,9 +80,10 @@ create or replace type body ut_output_table_buffer is
8080 overriding member function get_lines(a_initial_timeout natural := null, a_timeout_sec natural := null) return ut_output_data_rows pipelined is
8181 l_buffer_data ut_output_data_rows;
8282 l_message_ids ut_integer_list;
83+ l_finished_flags ut_integer_list;
8384 l_already_waited_for number(10,2) := 0;
8485 l_finished boolean := false;
85- lc_init_wait_sec constant naturaln := coalesce(a_initial_timeout, 60 * 60 * 4 ); -- 4 hours
86+ lc_init_wait_sec constant naturaln := coalesce(a_initial_timeout, 15 ); -- 15 seconds
8687 lc_max_wait_sec constant naturaln := coalesce(a_timeout_sec, 60 * 60 * 4); -- 4 hours
8788 l_wait_for integer := lc_init_wait_sec;
8889 lc_short_sleep_time constant number(1,1) := 0.1; --sleep for 100 ms between checks
@@ -111,13 +112,13 @@ create or replace type body ut_output_table_buffer is
111112 begin
112113 while not l_finished loop
113114 with ordered_buffer as (
114- select a.message_id, ut_output_data_row(a.text, a.item_type)
115+ select a.message_id, ut_output_data_row(a.text, a.item_type), is_finished
115116 from ut_output_buffer_tmp a
116117 where a.output_id = self.output_id
117118 order by a.message_id
118119 )
119120 select b.*
120- bulk collect into l_message_ids, l_buffer_data
121+ bulk collect into l_message_ids, l_buffer_data, l_finished_flags
121122 from ordered_buffer b
122123 where rownum <= lc_bulk_limit;
123124
@@ -141,18 +142,21 @@ create or replace type body ut_output_table_buffer is
141142 for i in 1 .. l_buffer_data.count loop
142143 if l_buffer_data(i).text is not null then
143144 pipe row(l_buffer_data(i));
144- else
145+ elsif l_finished_flags(i) = 1 then
145146 l_finished := true;
146147 exit;
147148 end if;
148149 end loop;
150+ remove_read_data(l_message_ids);
149151 end if;
150- remove_read_data(l_message_ids);
151- if l_finished then
152+ if l_finished or l_already_waited_for >= l_wait_for then
152153 remove_buffer_info();
153- end if;
154- if l_already_waited_for >= l_wait_for then
155- l_finished := true;
154+ if l_already_waited_for > 0 and l_already_waited_for >= l_wait_for then
155+ raise_application_error(
156+ ut_utils.gc_out_buffer_timeout,
157+ 'Timeout occurred while waiting for output data. Waited for: '||l_already_waited_for||' seconds.'
158+ );
159+ end if;
156160 end if;
157161 end loop;
158162 return;
0 commit comments