數據集成平台:datax將MySQL數據同步到hive(全部列和指定列)

news/发布时间2024/5/19 15:57:35

1.數據集成平台:將MySQL數據同步到hive(全部和指定列)

  1. python環境:2.7版本
  2. py腳本
    傳參:

source_database:數據庫
source_table:表
source_columns:列
source_splitPk:split key,要求必須是int類型

# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb#MySQL相关配置,需根据实际情况作出修改
mysql_host = "47.57.227.5"
mysql_port = "3306"
mysql_user = "vinson_readonly"
mysql_passwd = "8AGY5Eqq8Ac8VR7b"#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "mycluster"
hdfs_nn_port = "8020"#生成配置文件的目标路径,可根据实际情况作出修改
def get_connection():return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)def get_mysql_meta(database, table, columns):connection = get_connection()cursor = connection.cursor()if columns == 'all':# 如果传入 '*' 表示要所有列sql = "SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' ORDER BY ORDINAL_POSITION" % (database, table)else:# 传入指定列# 将每个列名加上单引号columns = ', '.join("'%s'" % col.strip() for col in columns.split(','))sql = "SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' AND COLUMN_NAME IN (%s) ORDER BY ORDINAL_POSITION" % (database, table, columns)cursor.execute(sql)fetchall = cursor.fetchall()# print(fetchall)cursor.close()connection.close()return fetchalldef get_mysql_columns(database, table,source_columns):return map(lambda x: x[0], get_mysql_meta(database,table,source_columns))def get_hive_columns(database, table,source_columns):def type_mapping(mysql_type):mappings = {"bigint": "bigint","int": "bigint","smallint": "bigint","tinyint": "bigint","mediumint": "bigint","decimal": "string","double": "double","float": "float","binary": "string","char": "string","varchar": "string","datetime": "string","time": "string","timestamp": "string","date": "string","text": "string","bit": "string",}return mappings[mysql_type]meta = get_mysql_meta(database, table,source_columns)return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)def generate_json(source_database, source_table,source_columns,source_splitPk):job = {"job": {"setting": {"speed": {"channel": 15},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","batchSize":"8192","batchByteSize":"33554432","parameter": {"username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(source_database, source_table,source_columns),"splitPk": source_splitPk,"connection": [{"table": [source_table],"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database + "?userCompress=true&useCursorFetch=true&useUnicode=true&characterEncoding=utf-8&useSSL=false"]}]}},"writer": {"name": "hdfswriter","batchSize":"8192","batchByteSize":"33554432","parameter": {"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,"fileType": "text","path": "${targetdir}","fileName": source_table,"column": get_hive_columns(source_database, source_table,source_columns),"writeMode": "append","fieldDelimiter": u"\u0001","compress": "gzip"}},"transformer": [{"name": "dx_groovy","parameter": {"code": "for(int i=0;i<record.getColumnNumber();i++){if(record.getColumn(i).getByteSize()!=0){Column column = record.getColumn(i); def str = column.asString(); def newStr=null; newStr=str.replaceAll(\"[\\r\\n]\",\"\"); record.setColumn(i, new StringColumn(newStr)); };};return record;","extraPackage":[]}}]}]}}output_path = "/opt/module/datax/job/import/" + source_databaseif not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:json.dump(job, f)def main(args):source_database = ""source_table = ""source_columns = ""source_splitPk = ""options, arguments = getopt.getopt(args, 'd:t:c:k:', ['sourcedb=', 'sourcetbl=', 'columns=', 'splitPk='])for opt_name, opt_value in options:if opt_name in ('-d', '--sourcedb'):source_database = opt_valueif opt_name in ('-t', '--sourcetbl'):source_table = opt_valueif opt_name in ('-c', '--columns'):source_columns = opt_valueif opt_name in ('-k', '--splitPk'):source_splitPk = opt_valuegenerate_json(source_database, source_table,source_columns,source_splitPk)if __name__ == '__main__':main(sys.argv[1:])
  1. sh腳本
#!/bin/bash
python ~/bin/sap_gateway_gen_import_config.py -d db -t table -c Id,created_date -k selfincrementid
python ~/bin/sap_gateway_gen_import_config.py  -d db -t table  -c all -k selfincrementid

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.bcls.cn/VjPF/3856.shtml

如若内容造成侵权/违法违规/事实不符,请联系编程老四网进行投诉反馈email:xxxxxxxx@qq.com,一经查实,立即删除!

相关文章

【数据结构】C语言实现二叉树的相关操作

树 定义 树&#xff08;Tree&#xff09;是 n (n > 0) 个结点的有限集 若 n 0&#xff0c;称为空树 若 n > 0&#xff0c;则它满足如下两个条件&#xff1a; 有且仅有一个特定的称为根&#xff08;Root&#xff09;的结点其余结点可分为 m(m>0) 个互不相交的有限…

洛谷P8772 [蓝桥杯 2022 省 A] 求和(前缀和差分)

#include <stdio.h> #include<stdlib.h> int main() {int n;scanf("%d", &n);// 读取数组 aint* a (int*)malloc(n * sizeof(int));for (int i 0; i < n; i) {scanf("%d", &a[i]);}// 计算前缀和数组 prefix_sumlong long *prefi…

城市白模:裸眼3D下的未来都市构想

随着科技的飞速发展&#xff0c;城市规划与建设已经迈入了一个全新的时代。在这个时代里&#xff0c;“城市白模”成为了设计师、建筑师、城市规划者乃至普通市民的热门话题。那么&#xff0c;什么是“城市白模”&#xff1f;它又如何改变我们对城市的认知与期待呢&#xff1f;…

【MySQL 探索之旅】初始MySQL数据库

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 | 《数据结构与算法》 | 《C生万物》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更新的动力❤️ &#x1f64f;小杨水平有…

Django后端开发——ORM

文章目录 参考资料ORM-基础字段及选项字段类型练习——添加模型类应用bookstore下的models.py数据库迁移——同步至mysqlmysql中查看效果字段选项Meta类定义示例:改表名应用bookstore下的models.py终端效果练习——改表名+字段选项修改应用bookstore下的models.py终端效果ORM基…

css3盒子

盒子模型 一.看透网页布局本质二.认识盒子三.盒子的边框&#xff08;border&#xff09;1.概念2.简写及分开写法3.合并问题&#xff08;会相加&#xff09;4.边框会影响盒子实际大小 四.盒子的内边距&#xff08;padding&#xff09;1.概念2.简写3.内边距会影响盒子实际大小4.特…

如何在本地服务器部署TeslaMate并远程查看特斯拉汽车数据无需公网ip

文章目录 1. Docker部署TeslaMate2. 本地访问TeslaMate3. Linux安装Cpolar4. 配置TeslaMate公网地址5. 远程访问TeslaMate6. 固定TeslaMate公网地址7. 固定地址访问TeslaMate TeslaMate是一个开源软件&#xff0c;可以通过连接特斯拉账号&#xff0c;记录行驶历史&#xff0c;统…

探索AI视频生成新纪元:文生视频Sora VS RunwayML、Pika及StableVideo——谁将引领未来

由于在AI生成视频的时长上成功突破到一分钟&#xff0c;再加上演示视频的高度逼真和高质量&#xff0c;Sora立刻引起了轰动。在Sora横空出世之前&#xff0c;Runway一直被视为AI生成视频的默认选择&#xff0c;尤其是自去年11月推出第二代模型以来&#xff0c;Runway还被称为“…

MySQL死锁产生的原因和解决方法

一.什么是死锁 要想知道MYSQL死锁产生的原因,就要知道什么是死锁?在了解什么是死锁之前,先来看一个概念:线程安全问题 1.线程安全问题 1.1什么是线程安全问题 线程安全问题&#xff0c;指的是在多线程环境当中&#xff0c;线程并发访问某个资源&#xff0c;从而导致的原子性&a…

Linux安装HBase

目录 前提下载安装配置启动 前提 安装hadoop安装ZooKeeper 下载安装 镜像↓&#xff0c;版本选对 https://mirrors.tuna.tsinghua.edu.cn/apache/hbase/2.4.17/ 下载&#xff0c;解压&#xff0c;重命名&#xff0c;路径复制对 wget https://mirrors.tuna.tsinghua.edu.cn/…

代码随想录三刷day04 | 链表之 24 两两交换链表中的节点 19删除链表的倒数第N个节点 面试题 02.07链表相交 142环形链表II

三刷day04 24. 两两交换链表中的节点19.删除链表的倒数第N个节点面试题 02.07. 链表相交142.环形链表II 24. 两两交换链表中的节点 题目链接 解题思路&#xff1a; 先将一些可能会改变的节点保存一下&#xff0c;然后再按照三个步骤就行修改 注意 要使用改变以后节点的指针&am…

Spring Session:入门案例

Spring Session provides an API and implementations for managing a user’s session information. Spring Session提供了一种用于管理用户session信息管理的API。 Spring Session特点 传统的Servlet应用中&#xff0c;Session是存储在服务端的&#xff0c;即&#xff1a;Ses…

消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳)

4、Work Queues Work Queues— 工作队列 (又称任务队列) 的主要思想是避免立即执行资源密集型任务&#xff0c;而不得不等待它完成。我们把任务封装为消息并将其发送到队列&#xff0c;在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时&#xff0c;这些工作…

山海鲸可视化软件:多场景下的数据呈现利器

在当今数据驱动的时代&#xff0c;数据可视化成为了企业和个人不可或缺的工具。作为一个老数据人&#xff0c;本文想借用自己常用山海鲸可视化软件&#xff0c;带大家了解在不同使用场景下数据可视化的应用。山海鲸可视化是一款可以免费编辑、本地化部署的产品&#xff0c;对数…

计算机网络-局域网

文章目录 局域网局域网拓扑结构以太网以太网传输介质以太网时隙提高传统以太网带宽的途径以太网帧格式 局域网协议IEEE 802参考模型IEEE802.2协议LLC帧格式及其控制字段LLC提供的三种服务 IEEE 802.3协议IEEE 802.4协议IEEE 802.5协议 高速局域网100M以太网千兆以太网万兆以太网…

ChatGPT的增长已经进入了瓶颈期

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

Linux--ACL权限管理

一.ACL权限管理简介 ACL&#xff08;Access Control List&#xff0c;访问控制列表&#xff09;是一种文件权限管理机制&#xff0c;它提供了比传统的UGO&#xff08;用户、组、其他&#xff09;权限更灵活的权限设置方式。以下是ACL的一些主要功能&#xff1a; 针对特定用户或…

查询数据库的编码集Oracle,MySQL

1、查询数据库的编码集Oracle,MySQL 1.1、oracle select * from v$nls_parameters where parameterNLS_CHARACTERSET; 查询版本&#xff1a;SELECT * FROM v$version 2、MySQL编码集 SELECT DEFAULT_CHARACTER_SET_NAME, DEFAULT_COLLATION_NAME FROM information_schema.SC…

18.贪心算法

排序贪心 区间贪心 删数贪心 统计二进制下有多少1 int Getbit_1(int n){int cnt0;while(n){nn&(n-1);cnt;}return cnt; }暴力加一维前缀和优化 #include <iostream> #include <climits> using namespace std; #define int long long const int N2e510; in…

来分析两道小题

一、源码 二、分析 首先它会接两个参数一个是id一个是ps&#xff0c;传递的话会包含一个flag.php&#xff0c;然后数据库连接&#xff0c;之后传递过滤&#xff0c;然后查询&#xff0c;如果查到了就会取id&#xff0c;取出来看是不是跟adog一样&#xff0c;如果是它告诉你账号…
推荐文章