@@ -89,6 +89,7 @@ create or replace type body ut_output_table_buffer is
8989 lc_long_sleep_time constant number(1) := 1; --sleep for 1 s when waiting long
9090 lc_long_wait_time constant number(1) := 1; --waiting more than 1 sec
9191 l_sleep_time number(2,1) := lc_short_sleep_time;
92+ lc_bulk_limit constant integer := 100;
9293
9394 procedure remove_read_data(a_message_ids ut_integer_list) is
9495 pragma autonomous_transaction;
@@ -99,13 +100,26 @@ create or replace type body ut_output_table_buffer is
99100 commit;
100101 end;
101102
102- begin
103- loop
104- select a.message_id, ut_output_data_row(a.text, a.item_type)
103+ procedure remove_buffer_info is
104+ pragma autonomous_transaction;
105+ begin
106+ delete from ut_output_buffer_info_tmp a
107+ where a.output_id = self.output_id;
108+ commit;
109+ end;
110+
111+ begin
112+ while not l_finished loop
113+ with ordered_buffer as (
114+ select a.message_id, ut_output_data_row(a.text, a.item_type)
115+ from ut_output_buffer_tmp a
116+ where a.output_id = self.output_id
117+ order by a.message_id
118+ )
119+ select b.*
105120 bulk collect into l_message_ids, l_buffer_data
106- from ut_output_buffer_tmp a
107- where a.output_id = self.output_id
108- order by a.message_id;
121+ from ordered_buffer b
122+ where rownum <= lc_bulk_limit;
109123
110124 --nothing fetched from output, wait and try again
111125 if l_buffer_data.count = 0 then
@@ -130,7 +144,12 @@ create or replace type body ut_output_table_buffer is
130144 end loop;
131145 end if;
132146 remove_read_data(l_message_ids);
133- exit when l_already_waited_for >= l_wait_for or l_finished;
147+ if l_finished then
148+ remove_buffer_info();
149+ end if;
150+ if l_already_waited_for >= l_wait_for then
151+ l_finished := true;
152+ end if;
134153 end loop;
135154 return;
136155 end;
0 commit comments