mysql - 一种从Mysql读取表数据到Pig的方法

标签 mysql hadoop apache-pig

大家都知道Pig支持了DBStorage , 但它们只支持从 Pig 到 mysql 的加载结果

STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...');

但请告诉我如何从 mysql 中读取表

data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table');

这是我的代码

public class DBLoader extends LoadFunc {
    private final Log log = LogFactory.getLog(getClass());
    private ArrayList mProtoTuple = null;
    private Connection con;
    private String jdbcURL;
    private String user;
    private String pass;
    private int batchSize;
    private int count = 0;
    private String query;
    ResultSet result;
    protected TupleFactory mTupleFactory = TupleFactory.getInstance();

    public DBLoader() {
    }

    public DBLoader(String driver, String jdbcURL, String user, String pass,
            String query) {

        try {
            Class.forName(driver);
        } catch (ClassNotFoundException e) {
            log.error("can't load DB driver:" + driver, e);
            throw new RuntimeException("Can't load DB Driver", e);
        }
        this.jdbcURL = jdbcURL;
        this.user = user;
        this.pass = pass;
        this.query = query;

    }

    @Override
    public InputFormat getInputFormat() throws IOException {
        // TODO Auto-generated method stub
        return new TextInputFormat();
    }

    @Override
    public Tuple getNext() throws IOException {
        // TODO Auto-generated method stub
        boolean next = false;

        try {
            next = result.next();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        if (!next)
            return null;
        int numColumns = 0;
        // Get result set meta data
        ResultSetMetaData rsmd;
        try {
            rsmd = result.getMetaData();
            numColumns = rsmd.getColumnCount();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        for (int i = 0; i < numColumns; i++) {

            try {
                Object field = result.getObject(i);

                switch (DataType.findType(field)) {
                case DataType.NULL:

                    mProtoTuple.add(null);

                    break;

                case DataType.BOOLEAN:
                    mProtoTuple.add((Boolean) field);

                    break;

                case DataType.INTEGER:
                    mProtoTuple.add((Integer) field);

                    break;

                case DataType.LONG:
                    mProtoTuple.add((Long) field);

                    break;

                case DataType.FLOAT:
                    mProtoTuple.add((Float) field);

                    break;

                case DataType.DOUBLE:
                    mProtoTuple.add((Double) field);

                    break;

                case DataType.BYTEARRAY:
                    byte[] b = ((DataByteArray) field).get();
                    mProtoTuple.add(b);

                    break;
                case DataType.CHARARRAY:
                    mProtoTuple.add((String) field);

                    break;
                case DataType.BYTE:
                    mProtoTuple.add((Byte) field);

                    break;

                case DataType.MAP:
                case DataType.TUPLE:
                case DataType.BAG:
                    throw new RuntimeException("Cannot store a non-flat tuple "
                            + "using DbStorage");

                default:
                    throw new RuntimeException("Unknown datatype "
                            + DataType.findType(field));

                }

            } catch (Exception ee) {
                throw new RuntimeException(ee);
            }
        }

        Tuple t = mTupleFactory.newTuple(mProtoTuple);
        mProtoTuple.clear();
        return t;

    }

    @Override
    public void prepareToRead(RecordReader arg0, PigSplit arg1)
            throws IOException {

        con = null;
        if (query == null) {
            throw new IOException("SQL Insert command not specified");
        }
        try {
            if (user == null || pass == null) {
                con = DriverManager.getConnection(jdbcURL);
            } else {
                con = DriverManager.getConnection(jdbcURL, user, pass);
            }
            con.setAutoCommit(false);
            result = con.createStatement().executeQuery(query);
        } catch (SQLException e) {
            log.error("Unable to connect to JDBC @" + jdbcURL);
            throw new IOException("JDBC Error", e);
        }
        count = 0;
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        // TODO Auto-generated method stub

        //TextInputFormat.setInputPaths(job, location);

    }

    class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{

        @Override
        public RecordReader<NullWritable, NullWritable> createRecordReader(
                InputSplit arg0, TaskAttemptContext arg1) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public List<InputSplit> getSplits(JobContext arg0) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            return null;
        }

    }

}

我尝试了很多次写UDF但都没有成功.....

最佳答案

如您所说,DBStorage 仅支持将结果保存到数据库。

要从 MySQL 加载数据,您可以查看名为 sqoop 的项目(将数据从数据库复制到 HDFS),或者您可以执行 mysql 转储,然后将文件复制到 HDFS。这两种方式都需要一些交互,不能直接从 Pig 内部使用。

第三种选择是考虑编写一个 Pig LoadFunc(您说您尝试编写一个 UDF)。它应该不会太难,您需要传递与 DBStorage 相同的选项(驱动程序、连接凭据和要执行的 SQL 查询),并且您可能也可以使用一些结果集元数据检查来自动生成模式。

关于mysql - 一种从Mysql读取表数据到Pig的方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10942739/

相关文章:

php - 替换引号后转义字符串的正确方法?

hadoop - pyspark.sql.utils.IllegalArgumentException : u'java.net.UnknownHostException: 用户'

sql - 如何使用 hive 或 pig 使行数据成为源和目标之字形

regex - 正则表达式在 pig 的每个循环中

mysql - SQL 查询(排序街道地址)

mysql - 使用子查询并具有 count(distinct) 条件的查询

scala - 缓存的 Spark RDD(从序列文件中读取)有无效条目,我该如何解决?

user-defined-functions - 将数组参数传递给自定义的 Pig 加载器

php - 通过 MySQL 连接 3 个表

java - Hadoop-PCap-Lib 字段类型