절차적 병렬

절차적 병렬 (procedural parallelism)의 두 가지 형태

  • 병렬 파이프라인 함수
  • DIY(do it yourself) 병렬처리
    1. Oracle 11g r2부터 지원되는 DBMS_PARALLEL_EXECUTE를 사용하는 방법
    2. DBMS_JOB을 이용하여 직접 개발하는 방법

병렬파이프라인함수

CREATE FUNCTION function-name(parameter-name ref-cursor-type)
RETURN rec_tab_type PIPELINED
PARALLEL_ENABLE(PARTITION parameter-name BY (HASH l RANGE) (column-list) l ANY) IS
BEGIN
...
END;


SQL> set serveroutput off
SQL> Create table t1 as
  2  select object_id id, object_name text
  3  from all_objects;

테이블이 생성되었습니다.

SQL> Create table t2 as
  2  select t1.*, 0 session_id
  3  from t1
  4  where 1=0;

테이블이 생성되었습니다.

SQL> Create table t3 as
  2  select t1.*, 0 session_id
  3  from t1;

테이블이 생성되었습니다.

SQL> alter table t1 add constraints t1_pk primary key(id);

테이블이 변경되었습니다.

SQL> alter table t3 add constraints t3_pk primary key(id);

테이블이 변경되었습니다.

SQL> Begin
  2    dbms_stats.set_table_stats( user, 'T1', numrows=>10000000, numblks=>100000);
  3  end;
  4  /

PL/SQL 처리가 정상적으로 완료되었습니다.

SQL> Begin
  2    dbms_stats.set_table_stats( user, 'T3', numrows=>10000000, numblks=>100000);
  3  end;
  4  /

PL/SQL 처리가 정상적으로 완료되었습니다.

SQL> -- 일반적인 배치process
SQL> Create or replace procedure process_data as
  2  begin
  3  for x in (select * from t1)
  4  loop
  5  -- 복잡한 처리대신 심플 DML로 진행
    6    update t3 set id = id + 10 where id = x.id;
  7  end loop;
  8  end;
  9  /

프로시저가 생성되었습니다.

SQL> alter session set statistics_level=all;

세션이 변경되었습니다.

SQL> exec process_data;

PL/SQL 처리가 정상적으로 완료되었습니다.

  • 변경사항을 적게 하여 bulk arry processing이 가능하나 수행시간 감소가 크지 않다.
  • 이것을 파이브라인 함수로 변경하여 오라클이 적절한 병렬도를 결정하게 할 수 있다.

pipelined함수를 이용한 방법


SQL> create or replace type t2_type as object (
  2  id number,
  3  text varchar2(30),
  4  session_id number);
  5  /

유형이 생성되었습니다.

SQL> Create or replace type t2_tab_type as table of t2_type;
  2  /

유형이 생성되었습니다.

SQL> Create or replace function parallel_pipelined(l_cursor in sys_refcursor)
  2  return t2_tab_type
  3  pipelined
  4  parallel_enable(partition l_cursor by any) is
  5    l_session_id number;
  6    l_rec t1%rowtype;
  7  begin
  8  select sid into l_session_id
  9    from v$mystat
 10   where rownum =1;
 11  loop
 12    fetch l_cursor into l_rec;
 13    exit when l_cursor%notfound;
 14    -- 복잡한 처리대신 심플 DML로 진행
  15    update t3 set id = id + 10 where id = l_rec.id;
 16    pipe row(t2_type(l_rec.id, l_rec.text, l_session_id));
 17  end loop;
 18  close l_cursor;
 19  return;
 20  end;
 21  /

함수가 생성되었습니다.

SQL> alter session enable parallel dml;

세션이 변경되었습니다.

SQL> insert /*+ append */ into t2(id, text, session_id)
  2  select * from table(parallel_pipelined(cursor(select /*+ parallel(t1) */ * from t1)))
  3  /
insert /*+ append */ into t2(id, text, session_id)
*
1행에 오류:
ORA-12801: 병렬 질의 서버 P000에 오류신호가 발생했습니다
ORA-14551: 질의 안에 DML 작업을 수행할 수 없습니다
ORA-06512: "WOONG.PARALLEL_PIPELINED",  15행

Create or replace function parallel_pipelined(l_cursor in sys_refcursor)
return t2_tab_type                                                      
pipelined                                                               
parallel_enable(partition l_cursor by any) is                           
  l_session_id number;                                                  
  l_rec t1%rowtype; 
  l_id number;                                                    
begin                                                                   
select sid into l_session_id                                            
  from v$mystat                                                         
 where rownum =1;                                                       
loop                                                                    
  fetch l_cursor into l_rec;                                            
  exit when l_cursor%notfound;                                          
  -- DML대신 Query로
  select id + 10 into l_id from t3 where id = l_rec.id;
  pipe row(t2_type(l_rec.id, l_rec.text, l_session_id));                
end loop;                                                               
close l_cursor;                                                         
return;                                                                 
end;                                                                    
/   

SQL> insert /*+ append */ into t2(id, text, session_id)
  2   select * from table(parallel_pipelined(cursor(select  * from t1)));

71483 개의 행이 만들어졌습니다.

SQL> commit;

커밋이 완료되었습니다.

SQL> select session_id, count(*) from t2 group by session_id;

SESSION_ID   COUNT(*)
---------- ----------
       190      71483

SQL> truncate table t2
  2  /

테이블이 잘렸습니다.

SQL> insert /*+ append */ into t2(id, text, session_id)
  2   select * from table(parallel_pipelined(cursor(select /*+ parallel(t1 4) */ * from t1)));

71483 개의 행이 만들어졌습니다.

SQL> commit;

커밋이 완료되었습니다.

SQL> select session_id, count(*) from t2 group by session_id;

SESSION_ID   COUNT(*)
---------- ----------
        66      16312
       224      20004
       161      18845
       192      16322


DIY_11gR2이상 : DBMS_PARALLEL_EXECUTE를 사용



SQL> TEST환경 만들기
SQL> CREATE TABLE test_tab (
  2    id          NUMBER,
  3    description VARCHAR2(50),
  4    num_col     NUMBER,
  5    session_id  NUMBER,
  6    CONSTRAINT test_tab_pk PRIMARY KEY (id)
  7  );

테이블이 생성되었습니다.

SQL>
SQL> INSERT /*+ APPEND */ INTO test_tab
  2  SELECT level,
  3         'Description for ' || level,
  4         CASE
  5           WHEN MOD(level, 5) = 0 THEN 10
  6           WHEN MOD(level, 3) = 0 THEN 20
  7           ELSE 30
  8         END,
  9         NULL
 10  FROM   dual
 11  CONNECT BY level <= 500000;

500000 개의 행이 만들어졌습니다.

SQL> COMMIT;

커밋이 완료되었습니다.

SQL>
SQL> EXEC DBMS_STATS.gather_table_stats(USER, 'TEST_TAB', cascade => TRUE);

PL/SQL 처리가 정상적으로 완료되었습니다.

SQL>
SQL> SELECT num_col, COUNT(*)
  2  FROM   test_tab
  3  GROUP BY num_col
  4  ORDER BY num_col;

   NUM_COL   COUNT(*)
---------- ----------
        10     100000
        20     133333
        30     266667

SQL> -- TASK 생성하기
SQL> BEGIN
  2    DBMS_PARALLEL_EXECUTE.create_task (task_name => 'test_task');
  3  END;
  4  /

PL/SQL 처리가 정상적으로 완료되었습니다.

SQL> COLUMN task_name FORMAT A10
SQL> SELECT task_name,
  2         status
  3  FROM   user_parallel_execute_tasks;

TASK_NAME  STATUS
---------- -------------------
test_task  CREATED

SQL> SELECT DBMS_PARALLEL_EXECUTE.generate_task_name
  2  FROM   dual;

GENERATE_TASK_NAME
--------------------------------------------------------------------------------
TASK$_202

SQL> -- 작업대상 청크를 rowid로 나누기 CREATE_CHUNKS_BY_ROWID
SQL> BEGIN
  2    DBMS_PARALLEL_EXECUTE.create_chunks_by_rowid(task_name   => 'test_task',
  3                                                 table_owner => 'WOONG',
  4                                                 table_name  => 'TEST_TAB',
  5                                                 by_row      => TRUE,
  6                                                 chunk_size  => 10000);
  7  END;
  8  /

PL/SQL 처리가 정상적으로 완료되었습니다.

SQL> SELECT chunk_id, status, start_rowid, end_rowid
  2  FROM   user_parallel_execute_chunks
  3  WHERE  task_name = 'test_task'
  4  ORDER BY chunk_id;

  CHUNK_ID STATUS               START_ROWID        END_ROWID
---------- -------------------- ------------------ ------------------
       203 UNASSIGNED           AAASNmAAGAAAqGQAAA AAASNmAAGAAAqGXCcP
       204 UNASSIGNED           AAASNmAAGAAAqGgAAA AAASNmAAGAAAqGnCcP
       205 UNASSIGNED           AAASNmAAGAAAqGoAAA AAASNmAAGAAAqGvCcP
       206 UNASSIGNED           AAASNmAAGAAAqGwAAA AAASNmAAGAAAqG3CcP
...
       270 UNASSIGNED           AAASNmAAGAAAqsAAAA AAASNmAAGAAAqsxCcP
       271 UNASSIGNED           AAASNmAAGAAAqsyAAA AAASNmAAGAAAqtjCcP
       272 UNASSIGNED           AAASNmAAGAAAqtkAAA AAASNmAAGAAAqt/CcP
       273 UNASSIGNED           AAASNmAAGAAAquAAAA AAASNmAAGAAAquxCcP
       274 UNASSIGNED           AAASNmAAGAAAquyAAA AAASNmAAGAAAqvjCcP
       275 UNASSIGNED           AAASNmAAGAAAqvkAAA AAASNmAAGAAAqv/CcP

73 개의 행이 선택되었습니다.

SQL> -- TASK 실행하기
SQL> DECLARE
  2    l_sql_stmt VARCHAR2(32767);
  3  BEGIN
  4    l_sql_stmt := 'UPDATE test_tab t
  5                   SET    t.num_col = t.num_col + 10,
  6                          t.session_id = SYS_CONTEXT(''USERENV'',''SESSIONID'')
  7                   WHERE rowid BETWEEN :start_id AND :end_id';
  8
  9    DBMS_PARALLEL_EXECUTE.run_task(task_name      => 'test_task',
 10                                   sql_stmt       => l_sql_stmt,
 11                                   language_flag  => DBMS_SQL.NATIVE,
 12                                   parallel_level => 10);
 13  END;
 14  /

PL/SQL 처리가 정상적으로 완료되었습니다.

SQL> -- 각 세션별 처리 일량 확인
SQL> SELECT session_id, COUNT(*)
  2  FROM   test_tab
  3  GROUP BY session_id
  4  ORDER BY session_id;

SESSION_ID   COUNT(*)
---------- ----------
     40429      89070
     40430      62188
     40431      53153
     40432      49902
     40433      35214
     40434      58915
     40435      40718
     40436      43808
     40437      33674
     40438      33358

10 개의 행이 선택되었습니다.


DIY_11gR2이상 : DBMS_JOB을 이용하여 직접 개발


SQL> truncate table t2;

테이블이 잘렸습니다.

SQL> Create table job_params (
  2  job number primary key,
  3  lo_rid rowid,
  4  hi_rid rowid);

테이블이 생성되었습니다.

SQL> create or replace procedure serial (p_job in number)
  2  is
  3    l_rec job_params%rowtype;
  4  begin
  5    select * into l_rec
  6      from job_params
  7     where job = p_job ;
  8
  9    for x in (select id, text
 10             from t1
 11            where rowid between l_rec.lo_rid and l_rec.hi_rid)
 12  loop
 13      -- 복잡한 처리대신 심플 DML로 진행
 14          update t3 set id = id + 10 where id = x.id;
 15      insert into t2 (id, text, session_id)
 16        values (x.id, x.text, p_job);
 17    end loop;
 18    delete from job_params where job = p_job;
 19  end;
 20  /

프로시저가 생성되었습니다.

SQL> declare
  l_job number;
begin
  for x in (select dbms_rowid.rowid_create(1, data_object_id, lo_fno, lo_block, 0) min_rid,
                dbms_rowid.rowid_create(1,data_object_id, hi_fno, hi_block, 10000) max_rid
           from (select distinct grp,
                      first_value(relative_fno) over (partition by grp order by relative_fno, block_id rows between unbounded preceding and unbounded following) lo_fno,
                      first_value(block_id) over (partition by grp order by relative_fno, block_id rows between unbounded preceding and unbounded following) lo_block,
                      last_value(relative_fno) over (partition by grp order by relative_fno, block_id rows between unbounded preceding and unbounded following) hi_fno,
                      last_value(block_id+blocks-1) over (partition by grp order by relative_fno, block_id rows between unbounded preceding and unbounded following) hi_block,
sum(blocks) over (partition by grp ) sum_blocks
                 from (select relative_fno,
                            block_id,
                            blocks,
                            trunc(sum(blocks) over (order by relative_fno, block_id)-0.01) / 
                                 (sum(blocks) over () / 8 ) grp
                        from dba_extents
                       where segment_name = upper('BIG_TABLE')
                         and owner = user 
                       order by block_id)), 
                      (select data_object_id
                        from user_objects 
                      where object_name = upper('big_table'))
      ) -- for sql 문장
  loop
    dbms_job.submit(l_job, 'serial(job);');
    insert into job_params(job, lo_rid, hi_rid)
      values (l_job, x.min_rid, x.max_rid);
  end loop;
end;
/

PL/SQL 처리가 정상적으로 완료되었습니다.

SQL> select * from job_params;

       JOB LO_RID             HI_RID
---------- ------------------ ------------------
        23 AAASNJAAGAAAACoAAA AAASNJAAGAAAACvCcQ
        24 AAASNJAAGAAAADYAAA AAASNJAAGAAAADfCcQ
        25 AAASNJAAGAAAADwAAA AAASNJAAGAAAAD3CcQ
        26 AAASNJAAGAAAAWAAAA AAASNJAAGAAAAX/CcQ
        27 AAASNJAAGAAABIAAAA AAASNJAAGAAABJ/CcQ
...
       221 AAASNJAAGAAAbSAAAA AAASNJAAGAAAbh/CcQ
       222 AAASNJAAGAAAbyAAAA AAASNJAAGAAAcB/CcQ
       223 AAASNJAAGAAAeiAAAA AAASNJAAGAAAex/CcQ

201 개의 행이 선택되었습니다.

SQL> commit;

커밋이 완료되었습니다.



SQL>