@@ -26,17 +26,16 @@ create or replace type body ut_output_table_buffer is
2626 pragma autonomous_transaction;
2727 begin
2828 if a_text is not null or a_item_type is not null then
29- if length (a_text) > ut_utils.gc_max_storage_varchar2_len then
29+ if lengthb (a_text) > ut_utils.gc_max_storage_varchar2_len then
3030 self.send_lines(
3131 ut_utils.convert_collection(
3232 ut_utils.clob_to_table(a_text, ut_utils.gc_max_storage_varchar2_len)
3333 ),
3434 a_item_type
3535 );
3636 else
37- self.last_write_message_id := self.last_write_message_id + 1;
3837 insert /*+ no_parallel */ into ut_output_buffer_tmp(output_id, message_id, text, item_type)
39- values (self.output_id, self.last_write_message_id , a_text, a_item_type);
38+ values (self.output_id, ut_output_buffer_tmp_seq.nextval , a_text, a_item_type);
4039 end if;
4140 commit;
4241 end if;
@@ -46,28 +45,26 @@ create or replace type body ut_output_table_buffer is
4645 pragma autonomous_transaction;
4746 begin
4847 insert /*+ no_parallel */ into ut_output_buffer_tmp(output_id, message_id, text, item_type)
49- select /*+ no_parallel */ self.output_id, self.last_write_message_id + rownum , t.column_value, a_item_type
48+ select /*+ no_parallel */ self.output_id, ut_output_buffer_tmp_seq.nextval , t.column_value, a_item_type
5049 from table(a_text_list) t
5150 where t.column_value is not null or a_item_type is not null;
52- self.last_write_message_id := self.last_write_message_id + SQL%rowcount;
5351 commit;
5452 end;
5553
5654 overriding member procedure send_clob(self in out nocopy ut_output_table_buffer, a_text clob, a_item_type varchar2 := null) is
5755 pragma autonomous_transaction;
5856 begin
5957 if a_text is not null and a_text != empty_clob() or a_item_type is not null then
60- if length (a_text) > ut_utils.gc_max_storage_varchar2_len then
58+ if ut_utils.lengthb_clob (a_text) > ut_utils.gc_max_storage_varchar2_len then
6159 self.send_lines(
6260 ut_utils.convert_collection(
6361 ut_utils.clob_to_table(a_text, ut_utils.gc_max_storage_varchar2_len)
6462 ),
6563 a_item_type
6664 );
6765 else
68- self.last_write_message_id := self.last_write_message_id + 1;
6966 insert /*+ no_parallel */ into ut_output_buffer_tmp(output_id, message_id, text, item_type)
70- values (self.output_id, self.last_write_message_id , a_text, a_item_type);
67+ values (self.output_id, ut_output_buffer_tmp_seq.nextval , a_text, a_item_type);
7168 end if;
7269 commit;
7370 end if;
@@ -99,7 +96,6 @@ create or replace type body ut_output_table_buffer is
9996 l_buffer_texts ut_varchar2_rows;
10097 l_buffer_item_types ut_varchar2_rows;
10198 l_finished_flags ut_integer_list;
102- l_last_read_message_id integer;
10399 l_already_waited_sec number(10,2) := 0;
104100 l_finished boolean := false;
105101 l_sleep_time number(2,1);
@@ -108,33 +104,30 @@ create or replace type body ut_output_table_buffer is
108104 l_producer_finished boolean := false;
109105
110106 procedure get_data_from_buffer_table(
111- a_last_read_message_id in out nocopy integer,
112107 a_buffer_texts out nocopy ut_varchar2_rows,
113108 a_buffer_item_types out nocopy ut_varchar2_rows,
114109 a_finished_flags out nocopy ut_integer_list
115110 ) is
116111 lc_bulk_limit constant integer := 20000;
117112 pragma autonomous_transaction;
118113 begin
119- a_last_read_message_id := coalesce(a_last_read_message_id,0);
120114 delete /*+ no_parallel */ from (
121115 select /*+ no_parallel */ *
122- from ut_output_buffer_tmp o
123- where o .output_id = self.output_id
124- and o .message_id <= a_last_read_message_id + lc_bulk_limit
125- order by o .message_id
116+ from ut_output_buffer_tmp a
117+ where a .output_id = self.output_id
118+ and a .message_id <= (select min(message_id) from ut_output_buffer_tmp o where o.output_id = self.output_id) + lc_bulk_limit
119+ order by a .message_id
126120 ) d
127121 returning d.text, d.item_type, d.is_finished
128122 bulk collect into a_buffer_texts, a_buffer_item_types, a_finished_flags;
129- a_last_read_message_id := a_last_read_message_id + a_finished_flags.count;
130123 commit;
131124 end;
132125 begin
133126 while not l_finished loop
134127
135128 l_sleep_time := case when l_already_waited_sec >= 1 then 0.5 else 0.1 end;
136129 l_lock_status := self.get_lock_status();
137- get_data_from_buffer_table( l_last_read_message_id, l_buffer_texts, l_buffer_item_types, l_finished_flags );
130+ get_data_from_buffer_table( l_buffer_texts, l_buffer_item_types, l_finished_flags );
138131
139132 if l_buffer_texts.count > 0 then
140133 l_already_waited_sec := 0;
0 commit comments