|
| 1 | +create or replace type body ut_output_clob_table_buffer is |
| 2 | + /* |
| 3 | + utPLSQL - Version 3 |
| 4 | + Copyright 2016 - 2018 utPLSQL Project |
| 5 | + |
| 6 | + Licensed under the Apache License, Version 2.0 (the "License"): |
| 7 | + you may not use this file except in compliance with the License. |
| 8 | + You may obtain a copy of the License at |
| 9 | + |
| 10 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + |
| 12 | + Unless required by applicable law or agreed to in writing, software |
| 13 | + distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | + See the License for the specific language governing permissions and |
| 16 | + limitations under the License. |
| 17 | + */ |
| 18 | + |
| 19 | + constructor function ut_output_clob_table_buffer(self in out nocopy ut_output_clob_table_buffer, a_output_id raw := null) return self as result is |
| 20 | + begin |
| 21 | + self.output_id := coalesce(a_output_id, sys_guid()); |
| 22 | + self.start_date := sysdate; |
| 23 | + self.last_message_id := 0; |
| 24 | + self.init(); |
| 25 | + self.cleanup_buffer(); |
| 26 | + return; |
| 27 | + end; |
| 28 | + |
| 29 | + overriding member procedure init(self in out nocopy ut_output_clob_table_buffer) is |
| 30 | + pragma autonomous_transaction; |
| 31 | + l_exists int; |
| 32 | + begin |
| 33 | + select count(*) into l_exists from ut_output_buffer_info_tmp where output_id = self.output_id; |
| 34 | + if ( l_exists > 0 ) then |
| 35 | + update ut_output_buffer_info_tmp set start_date = self.start_date where output_id = self.output_id; |
| 36 | + else |
| 37 | + insert into ut_output_buffer_info_tmp(output_id, start_date) values (self.output_id, self.start_date); |
| 38 | + end if; |
| 39 | + commit; |
| 40 | + end; |
| 41 | + |
| 42 | + overriding member procedure close(self in out nocopy ut_output_clob_table_buffer) is |
| 43 | + pragma autonomous_transaction; |
| 44 | + begin |
| 45 | + self.last_message_id := self.last_message_id + 1; |
| 46 | + insert into ut_output_clob_buffer_tmp(output_id, message_id, is_finished) |
| 47 | + values (self.output_id, self.last_message_id, 1); |
| 48 | + commit; |
| 49 | + end; |
| 50 | + |
| 51 | + overriding member procedure send_line(self in out nocopy ut_output_clob_table_buffer, a_text varchar2, a_item_type varchar2 := null) is |
| 52 | + pragma autonomous_transaction; |
| 53 | + begin |
| 54 | + if a_text is not null or a_item_type is not null then |
| 55 | + self.last_message_id := self.last_message_id + 1; |
| 56 | + insert into ut_output_clob_buffer_tmp(output_id, message_id, text, item_type) |
| 57 | + values (self.output_id, self.last_message_id, a_text, a_item_type); |
| 58 | + end if; |
| 59 | + commit; |
| 60 | + end; |
| 61 | + |
| 62 | + overriding member procedure send_lines(self in out nocopy ut_output_clob_table_buffer, a_text_list ut_varchar2_rows, a_item_type varchar2 := null) is |
| 63 | + pragma autonomous_transaction; |
| 64 | + begin |
| 65 | + insert into ut_output_clob_buffer_tmp(output_id, message_id, text, item_type) |
| 66 | + select self.output_id, self.last_message_id + rownum, t.column_value, a_item_type |
| 67 | + from table(a_text_list) t |
| 68 | + where t.column_value is not null or a_item_type is not null; |
| 69 | + self.last_message_id := self.last_message_id + a_text_list.count; |
| 70 | + commit; |
| 71 | + end; |
| 72 | + |
| 73 | + overriding member procedure send_clob(self in out nocopy ut_output_clob_table_buffer, a_text clob, a_item_type varchar2 := null) is |
| 74 | + pragma autonomous_transaction; |
| 75 | + begin |
| 76 | + if a_text is not null and a_text != empty_clob() or a_item_type is not null then |
| 77 | + self.last_message_id := self.last_message_id + 1; |
| 78 | + insert into ut_output_clob_buffer_tmp(output_id, message_id, text, item_type) |
| 79 | + values (self.output_id, self.last_message_id, a_text, a_item_type); |
| 80 | + end if; |
| 81 | + commit; |
| 82 | + end; |
| 83 | + |
| 84 | + overriding member function get_lines(a_initial_timeout natural := null, a_timeout_sec natural := null) return ut_output_data_rows pipelined is |
| 85 | + type t_rowid_tab is table of urowid; |
| 86 | + l_message_rowids t_rowid_tab; |
| 87 | + l_buffer_data ut_output_data_rows; |
| 88 | + l_finished_flags ut_integer_list; |
| 89 | + l_already_waited_for number(10,2) := 0; |
| 90 | + l_finished boolean := false; |
| 91 | + lc_init_wait_sec constant naturaln := coalesce(a_initial_timeout, 60 ); -- 1 minute |
| 92 | + lc_max_wait_sec constant naturaln := coalesce(a_timeout_sec, 60 * 60 * 4); -- 4 hours |
| 93 | + l_wait_for integer := lc_init_wait_sec; |
| 94 | + lc_short_sleep_time constant number(1,1) := 0.1; --sleep for 100 ms between checks |
| 95 | + lc_long_sleep_time constant number(1) := 1; --sleep for 1 s when waiting long |
| 96 | + lc_long_wait_time constant number(1) := 1; --waiting more than 1 sec |
| 97 | + l_sleep_time number(2,1) := lc_short_sleep_time; |
| 98 | + lc_bulk_limit constant integer := 5000; |
| 99 | + l_max_message_id integer := lc_bulk_limit; |
| 100 | + |
| 101 | + procedure remove_read_data(a_message_rowids t_rowid_tab) is |
| 102 | + pragma autonomous_transaction; |
| 103 | + begin |
| 104 | + forall i in 1 .. a_message_rowids.count |
| 105 | + delete from ut_output_clob_buffer_tmp a |
| 106 | + where rowid = a_message_rowids(i); |
| 107 | + commit; |
| 108 | + end; |
| 109 | + |
| 110 | + procedure remove_buffer_info is |
| 111 | + pragma autonomous_transaction; |
| 112 | + begin |
| 113 | + delete from ut_output_buffer_info_tmp a |
| 114 | + where a.output_id = self.output_id; |
| 115 | + commit; |
| 116 | + end; |
| 117 | + |
| 118 | + begin |
| 119 | + while not l_finished loop |
| 120 | + with ordered_buffer as ( |
| 121 | + select /*+ index(a) */ a.rowid, ut_output_data_row(a.text, a.item_type), is_finished |
| 122 | + from ut_output_clob_buffer_tmp a |
| 123 | + where a.output_id = self.output_id |
| 124 | + and a.message_id <= l_max_message_id |
| 125 | + order by a.message_id |
| 126 | + ) |
| 127 | + select b.* |
| 128 | + bulk collect into l_message_rowids, l_buffer_data, l_finished_flags |
| 129 | + from ordered_buffer b; |
| 130 | + |
| 131 | + --nothing fetched from output, wait and try again |
| 132 | + if l_buffer_data.count = 0 then |
| 133 | + $if dbms_db_version.version >= 18 $then |
| 134 | + dbms_session.sleep(l_sleep_time); |
| 135 | + $else |
| 136 | + dbms_lock.sleep(l_sleep_time); |
| 137 | + $end |
| 138 | + l_already_waited_for := l_already_waited_for + l_sleep_time; |
| 139 | + if l_already_waited_for > lc_long_wait_time then |
| 140 | + l_sleep_time := lc_long_sleep_time; |
| 141 | + end if; |
| 142 | + else |
| 143 | + --reset wait time |
| 144 | + -- we wait lc_max_wait_sec for new message |
| 145 | + l_wait_for := lc_max_wait_sec; |
| 146 | + l_already_waited_for := 0; |
| 147 | + l_sleep_time := lc_short_sleep_time; |
| 148 | + for i in 1 .. l_buffer_data.count loop |
| 149 | + if l_buffer_data(i).text is not null then |
| 150 | + pipe row(l_buffer_data(i)); |
| 151 | + elsif l_finished_flags(i) = 1 then |
| 152 | + l_finished := true; |
| 153 | + exit; |
| 154 | + end if; |
| 155 | + end loop; |
| 156 | + remove_read_data(l_message_rowids); |
| 157 | + l_max_message_id := l_max_message_id + lc_bulk_limit; |
| 158 | + end if; |
| 159 | + if l_finished or l_already_waited_for >= l_wait_for then |
| 160 | + remove_buffer_info(); |
| 161 | + if l_already_waited_for > 0 and l_already_waited_for >= l_wait_for then |
| 162 | + raise_application_error( |
| 163 | + ut_utils.gc_out_buffer_timeout, |
| 164 | + 'Timeout occurred while waiting for output data. Waited for: '||l_already_waited_for||' seconds.' |
| 165 | + ); |
| 166 | + end if; |
| 167 | + end if; |
| 168 | + end loop; |
| 169 | + return; |
| 170 | + end; |
| 171 | + |
| 172 | + overriding member function get_lines_cursor(a_initial_timeout natural := null, a_timeout_sec natural := null) return sys_refcursor is |
| 173 | + l_lines sys_refcursor; |
| 174 | + begin |
| 175 | + open l_lines for |
| 176 | + select text, item_type |
| 177 | + from table(self.get_lines(a_initial_timeout, a_timeout_sec)); |
| 178 | + return l_lines; |
| 179 | + end; |
| 180 | + |
| 181 | + overriding member procedure lines_to_dbms_output(self in ut_output_clob_table_buffer, a_initial_timeout natural := null, a_timeout_sec natural := null) is |
| 182 | + l_data sys_refcursor; |
| 183 | + l_clob clob; |
| 184 | + l_item_type varchar2(32767); |
| 185 | + l_lines ut_varchar2_list; |
| 186 | + begin |
| 187 | + l_data := self.get_lines_cursor(a_initial_timeout, a_timeout_sec); |
| 188 | + loop |
| 189 | + fetch l_data into l_clob, l_item_type; |
| 190 | + exit when l_data%notfound; |
| 191 | + l_lines := ut_utils.clob_to_table(l_clob); |
| 192 | + for i in 1 .. l_lines.count loop |
| 193 | + dbms_output.put_line(l_lines(i)); |
| 194 | + end loop; |
| 195 | + end loop; |
| 196 | + close l_data; |
| 197 | + end; |
| 198 | + |
| 199 | + member procedure cleanup_buffer(self in ut_output_clob_table_buffer, a_retention_time_sec natural := null) is |
| 200 | + gc_buffer_retention_sec constant naturaln := coalesce(a_retention_time_sec, 60 * 60 * 24); -- 24 hours |
| 201 | + l_retention_days number := gc_buffer_retention_sec / (60 * 60 * 24); |
| 202 | + l_max_retention_date date := sysdate - l_retention_days; |
| 203 | + pragma autonomous_transaction; |
| 204 | + begin |
| 205 | + delete from ut_output_clob_buffer_tmp t |
| 206 | + where t.output_id |
| 207 | + in (select i.output_id from ut_output_buffer_info_tmp i where i.start_date <= l_max_retention_date); |
| 208 | + |
| 209 | + delete from ut_output_buffer_info_tmp i where i.start_date <= l_max_retention_date; |
| 210 | + commit; |
| 211 | + end; |
| 212 | + |
| 213 | +end; |
| 214 | +/ |
0 commit comments