Skip to content

Commit e1119c6

Browse files
committed
Split output buffers into two.
Now we have separate buffer for CLOB data and VARCHAR2 text data.
1 parent 4d00c33 commit e1119c6

14 files changed

Lines changed: 412 additions & 44 deletions

source/core/output_buffers/ut_output_buffer_tmp.sql

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,27 @@ begin
2424
*/
2525
output_id raw(32) not null,
2626
message_id number(38,0) not null,
27-
text clob,
27+
text varchar2(4000),
2828
item_type varchar2(1000),
2929
is_finished number(1,0) default 0 not null,
3030
constraint ut_output_buffer_tmp_pk primary key(output_id, message_id),
3131
constraint ut_output_buffer_tmp_ck check(
3232
is_finished = 0 and (text is not null or item_type is not null )
3333
or is_finished = 1 and text is null and item_type is null ),
3434
constraint ut_output_buffer_fk1 foreign key (output_id) references ut_output_buffer_info_tmp$(output_id)
35-
) nologging initrans 100
35+
) organization index nologging initrans 100
36+
overflow nologging initrans 100
3637
';
37-
begin
38-
execute immediate
39-
v_table_sql || 'lob(text) store as securefile ut_output_text(retention none enable storage in row)';
40-
exception
41-
when e_non_assm then
42-
execute immediate
43-
v_table_sql || 'lob(text) store as basicfile ut_output_text(pctversion 0 enable storage in row)';
44-
45-
end;
38+
execute immediate v_table_sql;
39+
-- begin
40+
-- execute immediate
41+
-- v_table_sql || 'lob(text) store as securefile ut_output_text(retention none enable storage in row)';
42+
-- exception
43+
-- when e_non_assm then
44+
-- execute immediate
45+
-- v_table_sql || 'lob(text) store as basicfile ut_output_text(pctversion 0 enable storage in row)';
46+
--
47+
-- end;
4648
end;
4749
/
4850

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
declare
2+
v_table_sql varchar2(32767);
3+
e_non_assm exception;
4+
pragma exception_init(e_non_assm, -43853);
5+
begin
6+
v_table_sql := 'create table ut_output_clob_buffer_tmp$(
7+
/*
8+
utPLSQL - Version 3
9+
Copyright 2016 - 2018 utPLSQL Project
10+
Licensed under the Apache License, Version 2.0 (the "License"):
11+
you may not use this file except in compliance with the License.
12+
You may obtain a copy of the License at
13+
http://www.apache.org/licenses/LICENSE-2.0
14+
Unless required by applicable law or agreed to in writing, software
15+
distributed under the License is distributed on an "AS IS" BASIS,
16+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
See the License for the specific language governing permissions and
18+
limitations under the License.
19+
*/
20+
/*
21+
* This table is not a global temporary table as it needs to allow cross-session data exchange
22+
* It is used however as a temporary table with multiple writers.
23+
* This is why it has very high initrans and has nologging
24+
*/
25+
output_id raw(32) not null,
26+
message_id number(38,0) not null,
27+
text clob,
28+
item_type varchar2(1000),
29+
is_finished number(1,0) default 0 not null,
30+
constraint ut_output_clob_buffer_tmp_pk primary key(output_id, message_id),
31+
constraint ut_output_clob_buffer_tmp_ck check(
32+
is_finished = 0 and (text is not null or item_type is not null )
33+
or is_finished = 1 and text is null and item_type is null ),
34+
constraint ut_output_clob_buffer_tmp_fk1 foreign key (output_id) references ut_output_buffer_info_tmp$(output_id)
35+
) nologging initrans 100
36+
';
37+
begin
38+
execute immediate
39+
v_table_sql || 'lob(text) store as securefile ut_output_text(retention none enable storage in row)';
40+
exception
41+
when e_non_assm then
42+
execute immediate
43+
v_table_sql || 'lob(text) store as basicfile ut_output_text(pctversion 0 enable storage in row)';
44+
45+
end;
46+
end;
47+
/
48+
49+
-- This is needed to be EBR ready as editioning view can only be created by edition enabled user
50+
declare
51+
ex_nonedition_user exception;
52+
ex_view_doesnt_exist exception;
53+
pragma exception_init(ex_nonedition_user,-42314);
54+
pragma exception_init(ex_view_doesnt_exist,-942);
55+
v_view_source varchar2(32767);
56+
begin
57+
begin
58+
execute immediate 'drop view ut_output_clob_buffer_tmp';
59+
exception
60+
when ex_view_doesnt_exist then
61+
null;
62+
end;
63+
v_view_source := ' ut_output_clob_buffer_tmp as
64+
/*
65+
utPLSQL - Version 3
66+
Copyright 2016 - 2018 utPLSQL Project
67+
Licensed under the Apache License, Version 2.0 (the "License"):
68+
you may not use this file except in compliance with the License.
69+
You may obtain a copy of the License at
70+
http://www.apache.org/licenses/LICENSE-2.0
71+
Unless required by applicable law or agreed to in writing, software
72+
distributed under the License is distributed on an "AS IS" BASIS,
73+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
74+
See the License for the specific language governing permissions and
75+
limitations under the License.
76+
*/
77+
select output_id
78+
,message_id
79+
,text
80+
,item_type
81+
,is_finished
82+
from ut_output_clob_buffer_tmp$';
83+
84+
execute immediate 'create or replace editioning view '||v_view_source;
85+
exception
86+
when ex_nonedition_user then
87+
execute immediate 'create or replace view '||v_view_source;
88+
end;
89+
/
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
create or replace type body ut_output_clob_table_buffer is
2+
/*
3+
utPLSQL - Version 3
4+
Copyright 2016 - 2018 utPLSQL Project
5+
6+
Licensed under the Apache License, Version 2.0 (the "License"):
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
constructor function ut_output_clob_table_buffer(self in out nocopy ut_output_clob_table_buffer, a_output_id raw := null) return self as result is
20+
begin
21+
self.output_id := coalesce(a_output_id, sys_guid());
22+
self.start_date := sysdate;
23+
self.last_message_id := 0;
24+
self.init();
25+
self.cleanup_buffer();
26+
return;
27+
end;
28+
29+
overriding member procedure init(self in out nocopy ut_output_clob_table_buffer) is
30+
pragma autonomous_transaction;
31+
l_exists int;
32+
begin
33+
select count(*) into l_exists from ut_output_buffer_info_tmp where output_id = self.output_id;
34+
if ( l_exists > 0 ) then
35+
update ut_output_buffer_info_tmp set start_date = self.start_date where output_id = self.output_id;
36+
else
37+
insert into ut_output_buffer_info_tmp(output_id, start_date) values (self.output_id, self.start_date);
38+
end if;
39+
commit;
40+
end;
41+
42+
overriding member procedure close(self in out nocopy ut_output_clob_table_buffer) is
43+
pragma autonomous_transaction;
44+
begin
45+
self.last_message_id := self.last_message_id + 1;
46+
insert into ut_output_clob_buffer_tmp(output_id, message_id, is_finished)
47+
values (self.output_id, self.last_message_id, 1);
48+
commit;
49+
end;
50+
51+
overriding member procedure send_line(self in out nocopy ut_output_clob_table_buffer, a_text varchar2, a_item_type varchar2 := null) is
52+
pragma autonomous_transaction;
53+
begin
54+
if a_text is not null or a_item_type is not null then
55+
self.last_message_id := self.last_message_id + 1;
56+
insert into ut_output_clob_buffer_tmp(output_id, message_id, text, item_type)
57+
values (self.output_id, self.last_message_id, a_text, a_item_type);
58+
end if;
59+
commit;
60+
end;
61+
62+
overriding member procedure send_lines(self in out nocopy ut_output_clob_table_buffer, a_text_list ut_varchar2_rows, a_item_type varchar2 := null) is
63+
pragma autonomous_transaction;
64+
begin
65+
insert into ut_output_clob_buffer_tmp(output_id, message_id, text, item_type)
66+
select self.output_id, self.last_message_id + rownum, t.column_value, a_item_type
67+
from table(a_text_list) t
68+
where t.column_value is not null or a_item_type is not null;
69+
self.last_message_id := self.last_message_id + a_text_list.count;
70+
commit;
71+
end;
72+
73+
overriding member procedure send_clob(self in out nocopy ut_output_clob_table_buffer, a_text clob, a_item_type varchar2 := null) is
74+
pragma autonomous_transaction;
75+
begin
76+
if a_text is not null and a_text != empty_clob() or a_item_type is not null then
77+
self.last_message_id := self.last_message_id + 1;
78+
insert into ut_output_clob_buffer_tmp(output_id, message_id, text, item_type)
79+
values (self.output_id, self.last_message_id, a_text, a_item_type);
80+
end if;
81+
commit;
82+
end;
83+
84+
overriding member function get_lines(a_initial_timeout natural := null, a_timeout_sec natural := null) return ut_output_data_rows pipelined is
85+
type t_rowid_tab is table of urowid;
86+
l_message_rowids t_rowid_tab;
87+
l_buffer_data ut_output_data_rows;
88+
l_finished_flags ut_integer_list;
89+
l_already_waited_for number(10,2) := 0;
90+
l_finished boolean := false;
91+
lc_init_wait_sec constant naturaln := coalesce(a_initial_timeout, 60 ); -- 1 minute
92+
lc_max_wait_sec constant naturaln := coalesce(a_timeout_sec, 60 * 60 * 4); -- 4 hours
93+
l_wait_for integer := lc_init_wait_sec;
94+
lc_short_sleep_time constant number(1,1) := 0.1; --sleep for 100 ms between checks
95+
lc_long_sleep_time constant number(1) := 1; --sleep for 1 s when waiting long
96+
lc_long_wait_time constant number(1) := 1; --waiting more than 1 sec
97+
l_sleep_time number(2,1) := lc_short_sleep_time;
98+
lc_bulk_limit constant integer := 5000;
99+
l_max_message_id integer := lc_bulk_limit;
100+
101+
procedure remove_read_data(a_message_rowids t_rowid_tab) is
102+
pragma autonomous_transaction;
103+
begin
104+
forall i in 1 .. a_message_rowids.count
105+
delete from ut_output_clob_buffer_tmp a
106+
where rowid = a_message_rowids(i);
107+
commit;
108+
end;
109+
110+
procedure remove_buffer_info is
111+
pragma autonomous_transaction;
112+
begin
113+
delete from ut_output_buffer_info_tmp a
114+
where a.output_id = self.output_id;
115+
commit;
116+
end;
117+
118+
begin
119+
while not l_finished loop
120+
with ordered_buffer as (
121+
select /*+ index(a) */ a.rowid, ut_output_data_row(a.text, a.item_type), is_finished
122+
from ut_output_clob_buffer_tmp a
123+
where a.output_id = self.output_id
124+
and a.message_id <= l_max_message_id
125+
order by a.message_id
126+
)
127+
select b.*
128+
bulk collect into l_message_rowids, l_buffer_data, l_finished_flags
129+
from ordered_buffer b;
130+
131+
--nothing fetched from output, wait and try again
132+
if l_buffer_data.count = 0 then
133+
$if dbms_db_version.version >= 18 $then
134+
dbms_session.sleep(l_sleep_time);
135+
$else
136+
dbms_lock.sleep(l_sleep_time);
137+
$end
138+
l_already_waited_for := l_already_waited_for + l_sleep_time;
139+
if l_already_waited_for > lc_long_wait_time then
140+
l_sleep_time := lc_long_sleep_time;
141+
end if;
142+
else
143+
--reset wait time
144+
-- we wait lc_max_wait_sec for new message
145+
l_wait_for := lc_max_wait_sec;
146+
l_already_waited_for := 0;
147+
l_sleep_time := lc_short_sleep_time;
148+
for i in 1 .. l_buffer_data.count loop
149+
if l_buffer_data(i).text is not null then
150+
pipe row(l_buffer_data(i));
151+
elsif l_finished_flags(i) = 1 then
152+
l_finished := true;
153+
exit;
154+
end if;
155+
end loop;
156+
remove_read_data(l_message_rowids);
157+
l_max_message_id := l_max_message_id + lc_bulk_limit;
158+
end if;
159+
if l_finished or l_already_waited_for >= l_wait_for then
160+
remove_buffer_info();
161+
if l_already_waited_for > 0 and l_already_waited_for >= l_wait_for then
162+
raise_application_error(
163+
ut_utils.gc_out_buffer_timeout,
164+
'Timeout occurred while waiting for output data. Waited for: '||l_already_waited_for||' seconds.'
165+
);
166+
end if;
167+
end if;
168+
end loop;
169+
return;
170+
end;
171+
172+
overriding member function get_lines_cursor(a_initial_timeout natural := null, a_timeout_sec natural := null) return sys_refcursor is
173+
l_lines sys_refcursor;
174+
begin
175+
open l_lines for
176+
select text, item_type
177+
from table(self.get_lines(a_initial_timeout, a_timeout_sec));
178+
return l_lines;
179+
end;
180+
181+
overriding member procedure lines_to_dbms_output(self in ut_output_clob_table_buffer, a_initial_timeout natural := null, a_timeout_sec natural := null) is
182+
l_data sys_refcursor;
183+
l_clob clob;
184+
l_item_type varchar2(32767);
185+
l_lines ut_varchar2_list;
186+
begin
187+
l_data := self.get_lines_cursor(a_initial_timeout, a_timeout_sec);
188+
loop
189+
fetch l_data into l_clob, l_item_type;
190+
exit when l_data%notfound;
191+
l_lines := ut_utils.clob_to_table(l_clob);
192+
for i in 1 .. l_lines.count loop
193+
dbms_output.put_line(l_lines(i));
194+
end loop;
195+
end loop;
196+
close l_data;
197+
end;
198+
199+
member procedure cleanup_buffer(self in ut_output_clob_table_buffer, a_retention_time_sec natural := null) is
200+
gc_buffer_retention_sec constant naturaln := coalesce(a_retention_time_sec, 60 * 60 * 24); -- 24 hours
201+
l_retention_days number := gc_buffer_retention_sec / (60 * 60 * 24);
202+
l_max_retention_date date := sysdate - l_retention_days;
203+
pragma autonomous_transaction;
204+
begin
205+
delete from ut_output_clob_buffer_tmp t
206+
where t.output_id
207+
in (select i.output_id from ut_output_buffer_info_tmp i where i.start_date <= l_max_retention_date);
208+
209+
delete from ut_output_buffer_info_tmp i where i.start_date <= l_max_retention_date;
210+
commit;
211+
end;
212+
213+
end;
214+
/
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
create or replace type ut_output_clob_table_buffer under ut_output_buffer_base (
2+
/*
3+
utPLSQL - Version 3
4+
Copyright 2016 - 2018 utPLSQL Project
5+
6+
Licensed under the Apache License, Version 2.0 (the "License"):
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
start_date date,
20+
last_message_id number(38,0),
21+
constructor function ut_output_clob_table_buffer(self in out nocopy ut_output_clob_table_buffer, a_output_id raw := null) return self as result,
22+
overriding member procedure init(self in out nocopy ut_output_clob_table_buffer),
23+
overriding member procedure send_line(self in out nocopy ut_output_clob_table_buffer, a_text varchar2, a_item_type varchar2 := null),
24+
overriding member procedure send_lines(self in out nocopy ut_output_clob_table_buffer, a_text_list ut_varchar2_rows, a_item_type varchar2 := null),
25+
overriding member procedure send_clob(self in out nocopy ut_output_clob_table_buffer, a_text clob, a_item_type varchar2 := null),
26+
overriding member procedure close(self in out nocopy ut_output_clob_table_buffer),
27+
overriding member function get_lines(a_initial_timeout natural := null, a_timeout_sec natural := null) return ut_output_data_rows pipelined,
28+
overriding member function get_lines_cursor(a_initial_timeout natural := null, a_timeout_sec natural := null) return sys_refcursor,
29+
overriding member procedure lines_to_dbms_output(self in ut_output_clob_table_buffer, a_initial_timeout natural := null, a_timeout_sec natural := null),
30+
member procedure cleanup_buffer(self in ut_output_clob_table_buffer, a_retention_time_sec natural := null)
31+
) not final
32+
/

0 commit comments

Comments
 (0)