MapReduce + Oracle = Tablefunctions

jopen 10年前

我们在OpenWorld大会做的其中一件事,是漂亮的展示了如何在通用的Oracle数据库之上实现MapReduce系统。这里基于在这个博客上,显示了很好的实施tablefunctions和映射器等.

但后来我们想,为什么不经过tablefunction代码和MapReduce范例一种映射来告诉大家在Oracle中构建存在,并且如何利用Oracle创建一个数据处理/分析管道...所以这里是一些我们在OpenWorld大会中正在使用的代码.

承上启下,首先我们讨论的标题,高亮有趣的片段与代码注释,然后我们讨论的主体和实际(简单)映射器,减速机代码。该意见是很有希望使这个东西不言自明的...

Scenario

我们在这里做的相当简单。创建一个简单的表,表里有一些记录和循环。减速器是做一个聚集。步骤如下:

CREATE TABLE sls (salesman VARCHAR2(30), quantity number)     
/

INSERT INTO sls VALUES('Tom', 100);     
INSERT INTO sls VALUES('Chu', 200);      
INSERT INTO sls VALUES('Tom', 300);      
INSERT INTO sls VALUES('Mike', 100);      
INSERT INTO sls VALUES('Scott', 300);      
INSERT INTO sls VALUES('Tom', 250);      
INSERT INTO sls VALUES('Scott', 100);

commit;     
/

Header

create or replace package oracle_map_reduce        
is

-- The types we define here is similar to the input files     
-- and output files that are used in MR code and are used to       
-- store data while we run the actual package.

-- The big advantage is that we do not need to write to disk for     
-- intermediate results.

    type sales_t is table of sls%rowtype;       
    type sale_cur_t is ref cursor return sls%rowtype;        
    type sale_rec_t is record (name varchar2(30), total number);        
    type total_sales_t is table of sale_rec_t;

-- Next we define the funtions that do the work and make them known     
-- to the outside world

-- Note that both mapper and reducer are tablefunctions!

-- Both mapper and reducer are pipelined and executable in parallel     
-- the parallel degree is driven from the database side and is not      
-- scheduled by the actual program

    function mapper(inp_cur in sys_refcursor) return sales_t       
    pipelined parallel_enable (partition inp_cur by any);

-- the pipelined keyword tells the caller that this function acts as     
-- a row source      
--      
-- parallel_enable indicates that this function can be executed in parallel      
-- by the parallel query framework.

    function reducer(in_cur in sale_cur_t) return total_sales_t       
    pipelined parallel_enable (partition in_cur by hash(salesman))

-- Finally we can cluster the results so that similar rows are chunked      
-- together when used (note this does not drive distribution over the       
-- parallel slaves, which is done by the partition clause shown in the mapper      
-- and reducers)

    cluster in_cur by (salesman);

end;       
/

-- The body of the package has the mapper and the reducer code     
-- The header as is shown here by itself defines the signature of       
-- the package and declares types and variables to be used in the      
-- package.

Body

create or replace package oracle_map_reduce      
is      
    type sales_t is table of sls%rowtype;      
    type sale_cur_t is ref cursor return sls%rowtype;      
    type sale_rec_t is record (name varchar2(30), total number);      
    type total_sales_t is table of sale_rec_t;

    function mapper(inp_cur in sys_refcursor) return sales_t     
    pipelined parallel_enable (partition inp_cur by any);

    function reducer(in_cur in sale_cur_t) return total_sales_t     
    pipelined parallel_enable (partition in_cur by hash(salesman))

    cluster in_cur by (salesman);

end;     
/

-- The upper part is the header the following part if the body     
-- Note the difference in the create statement below as compared      
-- to the header

create or replace package body oracle_map_reduce        
is

    function mapper(inp_cur in sys_refcursor) return sales_t       
    pipelined parallel_enable (partition inp_cur by any)        
    is        
        sales_rec sls%ROWTYPE;        
        -- construct a record to hold an entire row from the SLS table      
        begin       
            -- First loop over all records in the table      
            loop       
                fetch inp_cur into sales_rec;        
                exit when inp_cur%notfound;        
                -- Place the found records from SLS into the variable      
                -- end the loop when there are no more rows to loop over      
                pipe row (sales_rec);       
                -- by using pipe row here we are giving back rows in streaming      
                -- fashion as you would expect from a table      
                -- this in combination with pipelined in the definition allows      
                -- the pipelining (e.g. giving data as it comes on board) of       
                -- a table function      
            end loop;       
            return;

-- Return is a mandatory piece that allows the consumer of data (our reducer      
-- in this case)     
-- to ensure all data has been sent. After return the rowsource is exhausted      
-- and no more data comes from this function.

        end mapper;

-- The above mapper does in effect nothing other than streaming data     
-- partitioned       

-- over to the next step. In MR the stream would be written to a file and then -- redistributed to the reducers

-- The reducer below computes and emits the sales figures     
 

    function reducer(in_cur in sale_cur_t) return total_sales_t       
    pipelined parallel_enable (partition in_cur by hash(salesman))        
    -- The partition by clause indicates that all instances of a particular      
    -- salesman must be sent to one instances of the reducer function

    cluster in_cur by (salesman)

    -- The cluster by clause tells the parallel query framework to cluster     
    -- all instances of a particular salesman together.

        IS

        sale_rec sls%ROWTYPE;       
        total_sale_rec sale_rec_t;

        -- two containers are created, one as input the other as output

        begin

            total_sale_rec.total := 0;       
            total_sale_rec.name := NULL;

            -- reset the values to initial values     
            loop       
                fetch in_cur into sale_rec;        
                exit when in_cur%notfound;

           -- some if then logic to ensure we pipe a row once all is processed

                if (total_sale_rec.name is null) then       
           -- The first instance is arriving, set the salesman value to that      
           -- input value      
           -- update 0 plus the incoming value for total

                    total_sale_rec.name := sale_rec.salesman;       
                    total_sale_rec.total := total_sale_rec.total +  
                    sale_rec.quantity;

                elsif ( total_sale_rec.name <> sale_rec.salesman) then

                -- We now switch sales man, and are done with the first      
                -- salesman (as rows are partitioned and clustered)      
                -- First pipe out the result of the previous salesman we      
                -- processed      
                -- then update the information to work on this new salesman      
                    pipe row (total_sale_rec);       
                    total_sale_rec.name := sale_rec.salesman;        
                    total_sale_rec.total := sale_rec.quantity;

                else

                -- We get here when we work on the same salesman and just add     
                -- the totals, the move on to the next record

                    total_sale_rec.total := total_sale_rec.total +       
                    sale_rec.quantity;

                end if;       
            end loop;

            -- The next piece of code ensures that any remaining rows that     
            -- have not been piped out      
            -- are piped out to the consumer. If there is a single salesman,      
            -- he is only piped out      
            -- in this piece of logic as we (in the above example code) only      
            -- pipe out upon a change      
            -- of salesman

            if total_sale_rec.total<> 0 then       
                pipe row (total_sale_rec);        
            end if;

            return;

            -- Again, we are now done and have piped all rows to our consumer

        end reducer;       
    end;        
    /

在一个SQL 查询中使用它

花了一点时间,但是一旦你看到查询,你就可以知道如何建立一系列预定义的程序,然后你可以实现

串在一起获得一组结果集

select *       
from table(oracle_map_reduce.reducer(cursor(        
          select * from table(oracle_map_reduce.mapper(cursor(       
                 select * from sls))) map_result)));

所有的逻辑管道数据都到下一个消费者,并且所有都是并行运行的。这使得它适合类似重型数据库ETL(我们首先为了它发明的)的任何东西,并且任何需要应用大量逻辑到记录的东西(像分析处理)。