CREATE FUNCTION function-name(parameter-name ref-cursor-type)
RETURN rec_tab_type PIPELINED
PARALLEL_ENABLE(PARTITION parameter-name BY (HASH l RANGE) (column-list) l ANY) IS
BEGIN
...
END;
SQL> set serveroutput off
SQL> Create table t1 as
2 select object_id id, object_name text
3 from all_objects;
테이블이 생성되었습니다.
SQL> Create table t2 as
2 select t1.*, 0 session_id
3 from t1
4 where 1=0;
테이블이 생성되었습니다.
SQL> Create table t3 as
2 select t1.*, 0 session_id
3 from t1;
테이블이 생성되었습니다.
SQL> alter table t1 add constraints t1_pk primary key(id);
테이블이 변경되었습니다.
SQL> alter table t3 add constraints t3_pk primary key(id);
테이블이 변경되었습니다.
SQL> Begin
2 dbms_stats.set_table_stats( user, 'T1', numrows=>10000000, numblks=>100000);
3 end;
4 /
PL/SQL 처리가 정상적으로 완료되었습니다.
SQL> Begin
2 dbms_stats.set_table_stats( user, 'T3', numrows=>10000000, numblks=>100000);
3 end;
4 /
PL/SQL 처리가 정상적으로 완료되었습니다.
SQL> -- 일반적인 배치process
SQL> Create or replace procedure process_data as
2 begin
3 for x in (select * from t1)
4 loop
5 -- 복잡한 처리대신 심플 DML로 진행
6 update t3 set id = id + 10 where id = x.id;
7 end loop;
8 end;
9 /
프로시저가 생성되었습니다.
SQL> alter session set statistics_level=all;
세션이 변경되었습니다.
SQL> exec process_data;
PL/SQL 처리가 정상적으로 완료되었습니다.
SQL> create or replace type t2_type as object (
2 id number,
3 text varchar2(30),
4 session_id number);
5 /
유형이 생성되었습니다.
SQL> Create or replace type t2_tab_type as table of t2_type;
2 /
유형이 생성되었습니다.
SQL> Create or replace function parallel_pipelined(l_cursor in sys_refcursor)
2 return t2_tab_type
3 pipelined
4 parallel_enable(partition l_cursor by any) is
5 l_session_id number;
6 l_rec t1%rowtype;
7 begin
8 select sid into l_session_id
9 from v$mystat
10 where rownum =1;
11 loop
12 fetch l_cursor into l_rec;
13 exit when l_cursor%notfound;
14 -- 복잡한 처리대신 심플 DML로 진행
15 update t3 set id = id + 10 where id = l_rec.id;
16 pipe row(t2_type(l_rec.id, l_rec.text, l_session_id));
17 end loop;
18 close l_cursor;
19 return;
20 end;
21 /
함수가 생성되었습니다.
SQL> alter session enable parallel dml;
세션이 변경되었습니다.
SQL> insert /*+ append */ into t2(id, text, session_id)
2 select * from table(parallel_pipelined(cursor(select /*+ parallel(t1) */ * from t1)))
3 /
insert /*+ append */ into t2(id, text, session_id)
*
1행에 오류:
ORA-12801: 병렬 질의 서버 P000에 오류신호가 발생했습니다
ORA-14551: 질의 안에 DML 작업을 수행할 수 없습니다
ORA-06512: "WOONG.PARALLEL_PIPELINED", 15행
Create or replace function parallel_pipelined(l_cursor in sys_refcursor)
return t2_tab_type
pipelined
parallel_enable(partition l_cursor by any) is
l_session_id number;
l_rec t1%rowtype;
l_id number;
begin
select sid into l_session_id
from v$mystat
where rownum =1;
loop
fetch l_cursor into l_rec;
exit when l_cursor%notfound;
-- DML대신 Query로
select id + 10 into l_id from t3 where id = l_rec.id;
pipe row(t2_type(l_rec.id, l_rec.text, l_session_id));
end loop;
close l_cursor;
return;
end;
/
SQL> insert /*+ append */ into t2(id, text, session_id)
2 select * from table(parallel_pipelined(cursor(select * from t1)));
71483 개의 행이 만들어졌습니다.
SQL> commit;
커밋이 완료되었습니다.
SQL> select session_id, count(*) from t2 group by session_id;
SESSION_ID COUNT(*)
---------- ----------
190 71483
SQL> truncate table t2
2 /
테이블이 잘렸습니다.
SQL> insert /*+ append */ into t2(id, text, session_id)
2 select * from table(parallel_pipelined(cursor(select /*+ parallel(t1 4) */ * from t1)));
71483 개의 행이 만들어졌습니다.
SQL> commit;
커밋이 완료되었습니다.
SQL> select session_id, count(*) from t2 group by session_id;
SESSION_ID COUNT(*)
---------- ----------
66 16312
224 20004
161 18845
192 16322
SQL> TEST환경 만들기
SQL> CREATE TABLE test_tab (
2 id NUMBER,
3 description VARCHAR2(50),
4 num_col NUMBER,
5 session_id NUMBER,
6 CONSTRAINT test_tab_pk PRIMARY KEY (id)
7 );
테이블이 생성되었습니다.
SQL>
SQL> INSERT /*+ APPEND */ INTO test_tab
2 SELECT level,
3 'Description for ' || level,
4 CASE
5 WHEN MOD(level, 5) = 0 THEN 10
6 WHEN MOD(level, 3) = 0 THEN 20
7 ELSE 30
8 END,
9 NULL
10 FROM dual
11 CONNECT BY level <= 500000;
500000 개의 행이 만들어졌습니다.
SQL> COMMIT;
커밋이 완료되었습니다.
SQL>
SQL> EXEC DBMS_STATS.gather_table_stats(USER, 'TEST_TAB', cascade => TRUE);
PL/SQL 처리가 정상적으로 완료되었습니다.
SQL>
SQL> SELECT num_col, COUNT(*)
2 FROM test_tab
3 GROUP BY num_col
4 ORDER BY num_col;
NUM_COL COUNT(*)
---------- ----------
10 100000
20 133333
30 266667
SQL> -- TASK 생성하기
SQL> BEGIN
2 DBMS_PARALLEL_EXECUTE.create_task (task_name => 'test_task');
3 END;
4 /
PL/SQL 처리가 정상적으로 완료되었습니다.
SQL> COLUMN task_name FORMAT A10
SQL> SELECT task_name,
2 status
3 FROM user_parallel_execute_tasks;
TASK_NAME STATUS
---------- -------------------
test_task CREATED
SQL> SELECT DBMS_PARALLEL_EXECUTE.generate_task_name
2 FROM dual;
GENERATE_TASK_NAME
--------------------------------------------------------------------------------
TASK$_202
SQL> -- 작업대상 청크를 rowid로 나누기 CREATE_CHUNKS_BY_ROWID
SQL> BEGIN
2 DBMS_PARALLEL_EXECUTE.create_chunks_by_rowid(task_name => 'test_task',
3 table_owner => 'WOONG',
4 table_name => 'TEST_TAB',
5 by_row => TRUE,
6 chunk_size => 10000);
7 END;
8 /
PL/SQL 처리가 정상적으로 완료되었습니다.
SQL> SELECT chunk_id, status, start_rowid, end_rowid
2 FROM user_parallel_execute_chunks
3 WHERE task_name = 'test_task'
4 ORDER BY chunk_id;
CHUNK_ID STATUS START_ROWID END_ROWID
---------- -------------------- ------------------ ------------------
203 UNASSIGNED AAASNmAAGAAAqGQAAA AAASNmAAGAAAqGXCcP
204 UNASSIGNED AAASNmAAGAAAqGgAAA AAASNmAAGAAAqGnCcP
205 UNASSIGNED AAASNmAAGAAAqGoAAA AAASNmAAGAAAqGvCcP
206 UNASSIGNED AAASNmAAGAAAqGwAAA AAASNmAAGAAAqG3CcP
...
270 UNASSIGNED AAASNmAAGAAAqsAAAA AAASNmAAGAAAqsxCcP
271 UNASSIGNED AAASNmAAGAAAqsyAAA AAASNmAAGAAAqtjCcP
272 UNASSIGNED AAASNmAAGAAAqtkAAA AAASNmAAGAAAqt/CcP
273 UNASSIGNED AAASNmAAGAAAquAAAA AAASNmAAGAAAquxCcP
274 UNASSIGNED AAASNmAAGAAAquyAAA AAASNmAAGAAAqvjCcP
275 UNASSIGNED AAASNmAAGAAAqvkAAA AAASNmAAGAAAqv/CcP
73 개의 행이 선택되었습니다.
SQL> -- TASK 실행하기
SQL> DECLARE
2 l_sql_stmt VARCHAR2(32767);
3 BEGIN
4 l_sql_stmt := 'UPDATE test_tab t
5 SET t.num_col = t.num_col + 10,
6 t.session_id = SYS_CONTEXT(''USERENV'',''SESSIONID'')
7 WHERE rowid BETWEEN :start_id AND :end_id';
8
9 DBMS_PARALLEL_EXECUTE.run_task(task_name => 'test_task',
10 sql_stmt => l_sql_stmt,
11 language_flag => DBMS_SQL.NATIVE,
12 parallel_level => 10);
13 END;
14 /
PL/SQL 처리가 정상적으로 완료되었습니다.
SQL> -- 각 세션별 처리 일량 확인
SQL> SELECT session_id, COUNT(*)
2 FROM test_tab
3 GROUP BY session_id
4 ORDER BY session_id;
SESSION_ID COUNT(*)
---------- ----------
40429 89070
40430 62188
40431 53153
40432 49902
40433 35214
40434 58915
40435 40718
40436 43808
40437 33674
40438 33358
10 개의 행이 선택되었습니다.
SQL> truncate table t2;
테이블이 잘렸습니다.
SQL> Create table job_params (
2 job number primary key,
3 lo_rid rowid,
4 hi_rid rowid);
테이블이 생성되었습니다.
SQL> create or replace procedure serial (p_job in number)
2 is
3 l_rec job_params%rowtype;
4 begin
5 select * into l_rec
6 from job_params
7 where job = p_job ;
8
9 for x in (select id, text
10 from t1
11 where rowid between l_rec.lo_rid and l_rec.hi_rid)
12 loop
13 -- 복잡한 처리대신 심플 DML로 진행
14 update t3 set id = id + 10 where id = x.id;
15 insert into t2 (id, text, session_id)
16 values (x.id, x.text, p_job);
17 end loop;
18 delete from job_params where job = p_job;
19 end;
20 /
프로시저가 생성되었습니다.
SQL> declare
l_job number;
begin
for x in (select dbms_rowid.rowid_create(1, data_object_id, lo_fno, lo_block, 0) min_rid,
dbms_rowid.rowid_create(1,data_object_id, hi_fno, hi_block, 10000) max_rid
from (select distinct grp,
first_value(relative_fno) over (partition by grp order by relative_fno, block_id rows between unbounded preceding and unbounded following) lo_fno,
first_value(block_id) over (partition by grp order by relative_fno, block_id rows between unbounded preceding and unbounded following) lo_block,
last_value(relative_fno) over (partition by grp order by relative_fno, block_id rows between unbounded preceding and unbounded following) hi_fno,
last_value(block_id+blocks-1) over (partition by grp order by relative_fno, block_id rows between unbounded preceding and unbounded following) hi_block,
sum(blocks) over (partition by grp ) sum_blocks
from (select relative_fno,
block_id,
blocks,
trunc(sum(blocks) over (order by relative_fno, block_id)-0.01) /
(sum(blocks) over () / 8 ) grp
from dba_extents
where segment_name = upper('BIG_TABLE')
and owner = user
order by block_id)),
(select data_object_id
from user_objects
where object_name = upper('big_table'))
) -- for sql 문장
loop
dbms_job.submit(l_job, 'serial(job);');
insert into job_params(job, lo_rid, hi_rid)
values (l_job, x.min_rid, x.max_rid);
end loop;
end;
/
PL/SQL 처리가 정상적으로 완료되었습니다.
SQL> select * from job_params;
JOB LO_RID HI_RID
---------- ------------------ ------------------
23 AAASNJAAGAAAACoAAA AAASNJAAGAAAACvCcQ
24 AAASNJAAGAAAADYAAA AAASNJAAGAAAADfCcQ
25 AAASNJAAGAAAADwAAA AAASNJAAGAAAAD3CcQ
26 AAASNJAAGAAAAWAAAA AAASNJAAGAAAAX/CcQ
27 AAASNJAAGAAABIAAAA AAASNJAAGAAABJ/CcQ
...
221 AAASNJAAGAAAbSAAAA AAASNJAAGAAAbh/CcQ
222 AAASNJAAGAAAbyAAAA AAASNJAAGAAAcB/CcQ
223 AAASNJAAGAAAeiAAAA AAASNJAAGAAAex/CcQ
201 개의 행이 선택되었습니다.
SQL> commit;
커밋이 완료되었습니다.
SQL>
- 강좌 URL : http://www.gurubee.net/lecture/4068
- 구루비 강좌는 개인의 학습용으로만 사용 할 수 있으며, 다른 웹 페이지에 게재할 경우에는 출처를 꼭 밝혀 주시면 고맙겠습니다.~^^
- 구루비 강좌는 서비스 제공을 위한 목적이나, 학원 홍보, 수익을 얻기 위한 용도로 사용 할 수 없습니다.