DataX试用Mongo→Hdfs&Hdfs→Mongo

发布时间 2024-01-08 15:21:43作者: 粒子先生

安装

下载安装包

wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

解压即可

配置

生成模板

python ./datax.py -r hdfsreader -w mongodbwriter 
python ./datax.py -r mongodbreader -w hdfswriter

Mongo→Hdfs配置文件

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mongodbreader",
          "parameter": {
            "address": [
              "172.17.5.96:27017"
            ],
            "collectionName": "cpws_test",
            "column": [
              {
                "name": "data",
                "type": "STRING"
              },
              {
                "name": "uniqid",
                "type": "STRING"
              }
            ],
            "dbName": "test",
            "userName": "",
            "userPassword": ""
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "column": [
              {
                "name": "data",
                "type": "STRING"
              },
              {
                "name": "uniqid",
                "type": "STRING"
              }
            ],
            "compress": "",
            "defaultFS": "hdfs://172.31.6.20:9000",
            "fieldDelimiter": "\t",
            "fileName": "cpws_test",
            "fileType": "text",
            "path": "/sql_test_11",
            "writeMode": "append"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 2
      }
    }
  }
}

Hdfs→Mongo配置文件

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "hdfsreader",
          "parameter": {
            "column": [
              {
                "index": 0,
                "type": "STRING"
              },
              {
                "index": 1,
                "type": "STRING"
              }
            ],
            "defaultFS": "hdfs://172.31.6.20:9000",
            "encoding": "UTF-8",
            "fieldDelimiter": "\t",
            "fileType": "text",
            "path": "/sql_test_11"
          }
        },
        "writer": {
          "name": "mongodbwriter",
          "parameter": {
            "address": [
              "172.17.5.96:27017"
            ],
            "collectionName": "cpws_test_01",
            "column": [
              {
                "name": "data",
                "type": "STRING"
              },
              {
                "name": "uniqid",
                "type": "STRING"
              }
            ],
            "dbName": "test",
            "upsertInfo": {
              "isUpsert": "",
              "upsertKey": ""
            },
            "userName": "",
            "userPassword": ""
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 2
      }
    }
  }
}

执行

python ./datax.py hdfs_mongo.json
python ./datax.py mongo_hdfs.json

测试结果

2020-11-18 11:19:28.454 [job-0] INFO StandAloneJobContainerCommunicator - Total 300739 records, 5825955 bytes | Speed 189.65KB/s, 10024 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.156s | All Task WaitReaderTime 37.544s | Percentage 100.00%
2020-11-18 11:19:28.455 [job-0] INFO JobContainer -
任务启动时刻 : 2020-11-18 11:18:57
任务结束时刻 : 2020-11-18 11:19:28
任务总计耗时 : 31s
任务平均流量 : 189.65KB/s
记录写入速度 : 10024rec/s
读出记录总数 : 300739
读写失败总数 : 0
2020-11-18 11:35:57.340 [job-0] INFO StandAloneJobContainerCommunicator - Total 300739 records, 5825955 bytes | Speed 568.94KB/s, 30073 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 4.062s | All Task WaitReaderTime 0.107s | Percentage 100.00%
2020-11-18 11:35:57.341 [job-0] INFO JobContainer -
任务启动时刻 : 2020-11-18 11:35:45
任务结束时刻 : 2020-11-18 11:35:57
任务总计耗时 : 12s
任务平均流量 : 568.94KB/s
记录写入速度 : 30073rec/s
读出记录总数 : 300739
读写失败总数 : 0

其他

DataX支持的数据转换方法

https://github.com/alibaba/DataX/blob/master/transformer/doc/transformer.md