DataX 对接FusionInsight

适用场景

DataX 0.1 ↔ FusionInsight HD 6.5 (HDFS)

DataX 0.1 ↔ FusionInsight MRS 8.0 (HDFS)

环境准备

  • 下载解压安装DataX

    wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
    tar -zxvf datax.tar.gz
    
  • 在FusionInsight下载用户的认证信息,将krb5.conf放入/etc下

  • 在FusionInsight下载用户的认证信息,将user.keytab放入/opt下

读取HDFS

  • 新增hdfsread.json文件,内容如下:

    {
      "job": {
        "setting": {
          "speed": {
            "channel": 3
          }
        },
        "content": [
          {
            "reader": {
              "name": "hdfsreader",
              "parameter": {
                "path": "/user/developuser/datax/*",
                "defaultFS": "hdfs://hacluster",
                "column": [
                     {
                    "index": 0,
                    "type": "string"
                     },
                     {
                    "index": 1,
                    "type": "string"
                     },
                     {
                    "type": "string",
                    "value": "hello"
                     },
                     {
                    "index": 2,
                    "type": "string"
                     }
                ],
                "fileType": "csv",
                "encoding": "UTF-8",
                "fieldDelimiter": ",",
                "haveKerberos": "true",
                "kerberosKeytabFilePath": "/opt/user.keytab",
                "kerberosPrincipal": "developuser@HADOOP.COM",
                "hadoopConfig":{
                   "dfs.nameservices": "hacluster",
                   "dfs.ha.namenodes.hacluster": "15,16",
                   "dfs.namenode.rpc-address.hacluster.15": "172.16.10.132:25000",
                   "dfs.namenode.rpc-address.hacluster.16": "172.16.10.133:25000",
                   "dfs.client.failover.proxy.provider.hacluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
                   "hadoop.security.authentication": "Kerberos",
                   "hadoop.rpc.protection": "privacy"
                 }
              }
    
            },
            "writer": {
              "name": "streamwriter",
              "parameter": {
                "print": true
              }
            }
          }
        ]
      }
    }
    

    其中几个重要的参考配置方式如下:

    参数名称 说明
    defaultFS FusionInsight集群默认为hacluster
    haveKerberos true
    kerberosKeytabFilePath keytab文件路径,如:/opt/user.keytab
    kerberosPrincipal kerberos用户,如:developuser@HADOOP.COM
    hadoop.security.authentication 参考集群的HDFS服务的参数配置,如:Kerberos
    hadoop.rpc.protection 参考集群的HDFS服务的参数配置,如:privacy
    dfs.nameservices 参考集群客户端的hdfs-site.xml进行配置,如hacluster
    dfs.ha.namenodes.hacluster 参数名称和值都要参考hdfs-site.xml进行配置,每套集群这里的值都不一样
    dfs.namenode.rpc-address.hacluster.15 参数名称和值都要参考hdfs-site.xml进行配置,每套集群这里的值都不一样
    dfs.namenode.rpc-address.hacluster.16 参数名称和值都要参考hdfs-site.xml进行配置,每套集群这里的值都不一样
    dfs.client.failover.proxy.provider.hacluster 默认填写org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
  • 执行DataX的任务

    python datax.py ../hdfs_read.json
    
  • 能够读取出HDFS中文件的内容

写入HDFS

  • 新增hdfswrite.json文件,内容如下:

    {
      "setting": {},
      "job": {
        "setting": {
          "speed": {
            "channel": 2
          }
        },
        "content": [
          {
            "reader": {
              "name": "txtfilereader",
              "parameter": {
                "path": ["/opt/data.csv"],
                "encoding": "UTF-8",
                "column": [
                  {
                    "index": 0,
                    "type": "long"
                  },
                  {
                    "index": 1,
                    "type": "DOUBLE"
                  },
                  {
                    "index": 2,
                    "type": "STRING"
                  },
                  {
                    "index": 3,
                    "type": "BOOLEAN"
                  },
                  {
                    "index": 4,
                    "type": "date"
                  }
                ],
                "fieldDelimiter": ","
              }
            },
            "writer": {
              "name": "hdfswriter",
              "parameter": {
                "defaultFS": "hdfs://hacluster",
                "fileType": "orc",
                "path": "/user/developuser/hdfswrite",
                "fileName": "hdfsdata.orc",
                "column": [
                  {
                    "name": "col1",
                    "type": "TINYINT"
                  },
                  {
                    "name": "col2",
                    "type": "DOUBLE"
                  },
                  {
                    "name": "col3",
                    "type": "CHAR"
                  },
                  {
                    "name": "col4",
                    "type": "BOOLEAN"
                  },
                  {
                    "name": "col5",
                    "type": "DATE"
                  }
                ],
                "writeMode": "append",
                "fieldDelimiter": ",",
                "compress":"NONE",
                            "haveKerberos": "true",
                  "kerberosKeytabFilePath": "/opt/user.keytab",
                  "kerberosPrincipal": "developuser@HADOOP.COM",
                  "hadoopConfig":{
                    "dfs.nameservices": "hacluster",
                    "dfs.ha.namenodes.hacluster": "15,16",
                    "dfs.namenode.rpc-address.hacluster.15": "172.16.10.132:25000",
                    "dfs.namenode.rpc-address.hacluster.16": "172.16.10.133:25000",
                    "dfs.client.failover.proxy.provider.hacluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
                    "hadoop.security.authentication": "Kerberos",
                    "hadoop.rpc.protection": "privacy"
                  }
              }
            }
          }
        ]
      }
    }
    

    其中几个重要的参考配置方式如下:

    参数名称 说明
    defaultFS FusionInsight集群默认为hacluster
    haveKerberos true
    kerberosKeytabFilePath keytab文件路径,如:/opt/user.keytab
    kerberosPrincipal kerberos用户,如:developuser@HADOOP.COM
    hadoop.security.authentication 参考集群的HDFS服务的参数配置,如:Kerberos
    hadoop.rpc.protection 参考集群的HDFS服务的参数配置,如:privacy
    dfs.nameservices 参考集群客户端的hdfs-site.xml进行配置,如hacluster
    dfs.ha.namenodes.hacluster 参数名称和值都要参考hdfs-site.xml进行配置,每套集群这里的值都不一样
    dfs.namenode.rpc-address.hacluster.15 参数名称和值都要参考hdfs-site.xml进行配置,每套集群这里的值都不一样
    dfs.namenode.rpc-address.hacluster.16 参数名称和值都要参考hdfs-site.xml进行配置,每套集群这里的值都不一样
    dfs.client.failover.proxy.provider.hacluster 默认填写org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
  • 执行DataX的任务

    python datax.py ../hdfs_write.json
    
  • 数据能够正常写入HDFS