镜缘浮影 小人本住在 苏州的城外 家里有屋又有田 生活乐无边

Forklift ETL 基础(一)

2016-08-12
wilmosfang
原文地址 http://soft.dog/2016/08/12/forklift-basic-1/

前言

Forklift ETL 是基于 Ruby 语言用来对 Mysql 和 Elasticsearch 进行 ETL 的工具集

Forklift is a ruby gem that makes it easy for you to move your data around. Forklift can be an integral part of your datawarehouse pipeline or a backup tool. Forklift can collect and collapse data from multiple sources or across a single source

什么是 ETL 可以参考前面的一篇博客 ETL (Extract-Transform-Load) with Kiba

ETL主要分三部:

  • 数据抽取:(Data extraction)从各类数据源读取数据
  • 数据处理:(Data transformation)对数据进行适当的加工处理以适应需求
  • 数据装载:(Data loading)将结果保存到合适的地方

这里分享一下 forklift_etl 的相关基础,详细可以参考 forklift

Tip: 当前最新版本为 forklift_etl (1.2.2)


概要


环境

h102

[root@h102 ~]# ruby -v
ruby 2.3.0p0 (2015-12-25 revision 53290) [x86_64-linux]
[root@h102 ~]# gem --version
2.5.1
[root@h102 ~]# cat /etc/issue
CentOS release 6.6 (Final)
Kernel \r on an \m

[root@h102 ~]# uname  -a 
Linux h102.temp 2.6.32-504.el6.x86_64 #1 SMP Wed Oct 15 04:27:16 UTC 2014 x86_64 x86_64 x86_64 GNU/Linux
[root@h102 ~]#  

h105

[root@h105 ~]# cat /etc/issue
CentOS release 6.6 (Final)
Kernel \r on an \m

[root@h105 ~]# uname -a 
Linux h105 2.6.32-504.el6.x86_64 #1 SMP Wed Oct 15 04:27:16 UTC 2014 x86_64 x86_64 x86_64 GNU/Linux
[root@h105 ~]# mysql -u root -p 
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 4
Server version: 5.6.27-76.0-log Percona Server (GPL), Release 76.0, Revision 5498987

Copyright (c) 2009-2015 Percona LLC and/or its affiliates
Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| Syslog             |
| db_d               |
| db_s               |
| mysql              |
| performance_schema |
| test               |
| testxxx            |
+--------------------+
8 rows in set (0.00 sec)

mysql>

创建 ETL 项目

配置与依赖

[root@h102 ~]# mkdir forklift
[root@h102 ~]# cd forklift/
[root@h102 forklift]# vim Gemfile 
[root@h102 forklift]# cat Gemfile 
source 'https://gems.ruby-china.org'


gem 'forklift_etl'
[root@h102 forklift]# bundle install 
Don't run Bundler as root. Bundler can ask for sudo if it is needed, and installing your bundle as root will break this application
for all non-root users on this machine.
Fetching gem metadata from https://gems.ruby-china.org/
Fetching version metadata from https://gems.ruby-china.org/
Fetching dependency metadata from https://gems.ruby-china.org/
Resolving dependencies...
Using i18n 0.7.0
Using json 1.8.3
Using minitest 5.9.0
Using thread_safe 0.3.5
Using multi_json 1.12.1
Using multipart-post 2.0.0
Using lumberjack 1.0.10
Using mysql2 0.4.4
Using mime-types-data 3.2016.0521
Using bundler 1.12.5
Using tzinfo 1.2.2
Using elasticsearch-api 1.1.0
Using faraday 0.9.2
Using mime-types 3.1
Using activesupport 4.2.7
Using elasticsearch-transport 1.1.0
Using mail 2.6.4
Using elasticsearch 1.1.0
Using pony 1.11
Installing forklift_etl 1.2.2
Bundle complete! 1 Gemfile dependency, 20 gems now installed.
Use `bundle show [gemname]` to see where a bundled gem is installed.
[root@h102 forklift]#

生成项目

使用 bundle exec forklift --generate 在当前目录中生成项目结构

[root@h102 forklift]# bundle exec forklift --generate
Example plan generated
Example plan generated
Example plan generated
Example plan generated
Example plan generated
[root@h102 forklift]# ls
config  Gemfile  Gemfile.lock  log  patterns  pid  plan.rb  template  transformations  transports
[root@h102 forklift]# tree 
.
├── config
│   ├── connections
│   │   ├── csv
│   │   ├── elasticsearch
│   │   └── mysql
│   │       ├── destination.yml
│   │       └── source.yml
│   └── email.yml
├── Gemfile
├── Gemfile.lock
├── log
├── patterns
├── pid
├── plan.rb
├── template
│   └── email.erb
├── transformations
└── transports

11 directories, 7 files
[root@h102 forklift]#

forklift 命令源码

forklift 引导脚本

[root@h102 ~]# which forklift
/usr/local/rvm/gems/ruby-2.3.0/bin/forklift
[root@h102 ~]# cat /usr/local/rvm/gems/ruby-2.3.0/bin/forklift
#!/usr/bin/env ruby_executable_hooks
#
# This file was generated by RubyGems.
#
# The application 'forklift_etl' is installed as part of a gem, and
# this file is here to facilitate running it.
#

require 'rubygems'

version = ">= 0.a"

if ARGV.first
  str = ARGV.first
  str = str.dup.force_encoding("BINARY") if str.respond_to? :force_encoding
  if str =~ /\A_(.*)_\z/ and Gem::Version.correct?($1) then
    version = $1
    ARGV.shift
  end
end

gem 'forklift_etl', version
load Gem.bin_path('forklift_etl', 'forklift', version)
[root@h102 ~]# 

这个脚本是在做一个版本的兼容处理

实际执行的是下面这一段代码

load Gem.bin_path('forklift_etl', 'forklift', version)

其实就是在加载 /usr/local/rvm/gems/ruby-2.3.0/gems/forklift_etl-1.2.2/bin/forklift 文件

[root@h102 ~]# ruby -e "puts Gem.bin_path('forklift_etl', 'forklift', '>= 0.a')"
/usr/local/rvm/gems/ruby-2.3.0/gems/forklift_etl-1.2.2/bin/forklift
[root@h102 ~]# 

forklift wrapper 脚本

[root@h102 ~]# cat /usr/local/rvm/gems/ruby-2.3.0/gems/forklift_etl-1.2.2/bin/forklift
#!/usr/bin/env ruby

require 'rubygems'
require 'fileutils'

begin
  require 'forklift'
rescue LoadError
  require "#{File.expand_path(File.dirname(__FILE__))}/../lib/forklift.rb"
end

def generate
  p = Dir.pwd

  Dir.mkdir "#{p}/config"
  Dir.mkdir "#{p}/config/connections"
  Dir.mkdir "#{p}/config/connections/mysql"
  Dir.mkdir "#{p}/config/connections/elasticsearch"
  Dir.mkdir "#{p}/config/connections/csv"
  Dir.mkdir "#{p}/log"
  Dir.mkdir "#{p}/pid"
  Dir.mkdir "#{p}/template"
  Dir.mkdir "#{p}/transformations"
  Dir.mkdir "#{p}/transports"
  Dir.mkdir "#{p}/patterns"

  template('source.yml',      "#{p}/config/connections/mysql/source.yml")
  template('destination.yml', "#{p}/config/connections/mysql/destination.yml")
  template('email.yml',       "#{p}/config/email.yml")
  template('email.erb',       "#{p}/template/email.erb")
  template('plan.rb',         "#{p}/plan.rb")
end

def template(source, destination)
  t = "#{File.expand_path(File.dirname(__FILE__))}/../template"
  FileUtils.copy("#{t}/#{source}", destination)
  puts "Example plan generated"
end


def run_plan
  file = "#{Dir.pwd}/#{ARGV[0]}"
  if ARGV[0].nil? 
    puts "[error] Please provide a plan.rb as the first argument"
    exit(1)
  end
  Dir.chdir File.expand_path(File.dirname(ARGV[0]))
  begin
    require 'bundler'
    Bundler.require(:default)
  rescue Exception => e
    puts "cannot load bundler: #{e}"
  end
  require file
end

############

if ['--generate', '-generate'].include?(ARGV[0])
  generate
else
  run_plan
end
[root@h102 ~]# 

这个脚本做了三件事:

  • 加载了一个文件
  • 定义了一个 generate 方法
  • 定义了一个 run_plan 方法

加载了一个文件

加载了这个文件

[root@h102 bin]# ls
forklift  x
[root@h102 bin]# pwd
/usr/local/rvm/gems/ruby-2.3.0/gems/forklift_etl-1.2.2/bin
[root@h102 bin]# cat x
puts "#{File.expand_path(File.dirname(__FILE__))}/../lib/forklift.rb"
[root@h102 bin]# ruby x
/usr/local/rvm/gems/ruby-2.3.0/gems/forklift_etl-1.2.2/bin/../lib/forklift.rb
[root@h102 bin]# ll /usr/local/rvm/gems/ruby-2.3.0/gems/forklift_etl-1.2.2/bin/../lib/forklift.rb
-rwxr-xr-x 1 root rvm 651 Aug  9 15:12 /usr/local/rvm/gems/ruby-2.3.0/gems/forklift_etl-1.2.2/bin/../lib/forklift.rb
[root@h102 bin]# 

generate 方法

def generate
  p = Dir.pwd

  Dir.mkdir "#{p}/config"
  Dir.mkdir "#{p}/config/connections"
  Dir.mkdir "#{p}/config/connections/mysql"
  Dir.mkdir "#{p}/config/connections/elasticsearch"
  Dir.mkdir "#{p}/config/connections/csv"
  Dir.mkdir "#{p}/log"
  Dir.mkdir "#{p}/pid"
  Dir.mkdir "#{p}/template"
  Dir.mkdir "#{p}/transformations"
  Dir.mkdir "#{p}/transports"
  Dir.mkdir "#{p}/patterns"

  template('source.yml',      "#{p}/config/connections/mysql/source.yml")
  template('destination.yml', "#{p}/config/connections/mysql/destination.yml")
  template('email.yml',       "#{p}/config/email.yml")
  template('email.erb',       "#{p}/template/email.erb")
  template('plan.rb',         "#{p}/plan.rb")
end

整个过程就是在当前目录中创建若干目录,然后把模板文件拷贝进来

其中使用到了 template 方法

def template(source, destination)
  t = "#{File.expand_path(File.dirname(__FILE__))}/../template"
  FileUtils.copy("#{t}/#{source}", destination)
  puts "Example plan generated"
end

它起的作用就是负责拷贝模板文件到目标位置中

[root@h102 bin]# pwd
/usr/local/rvm/gems/ruby-2.3.0/gems/forklift_etl-1.2.2/bin
[root@h102 bin]# ll ../template/
total 20
-rwxr-xr-x 1 root rvm  89 Aug  9 15:12 destination.yml
-rwxr-xr-x 1 root rvm  29 Aug  9 15:12 email.erb
-rwxr-xr-x 1 root rvm 538 Aug  9 15:12 email.yml
-rwxr-xr-x 1 root rvm 150 Aug  9 15:12 plan.rb
-rwxr-xr-x 1 root rvm  84 Aug  9 15:12 source.yml
[root@h102 bin]#

run_plan 方法

def run_plan
  file = "#{Dir.pwd}/#{ARGV[0]}"
  if ARGV[0].nil?
    puts "[error] Please provide a plan.rb as the first argument"
    exit(1)
  end
  Dir.chdir File.expand_path(File.dirname(ARGV[0]))
  begin
    require 'bundler'
    Bundler.require(:default)
  rescue Exception => e
    puts "cannot load bundler: #{e}"
  end
  require file
end

这个方法最主要的就是 require file

file 就是当前目录中的 plan.rb 文件(可以不是这个文件名,自定义其它文件名),接在 forklift 后面,作为第一个参数


命令汇总

  • mkdir forklift
  • cd forklift/
  • vim Gemfile
  • cat Gemfile
  • bundle install
  • bundle exec forklift --generate
  • tree
  • which forklift
  • cat /usr/local/rvm/gems/ruby-2.3.0/bin/forklift
  • ruby -e "puts Gem.bin_path('forklift_etl', 'forklift', '>= 0.a')"
  • cat /usr/local/rvm/gems/ruby-2.3.0/gems/forklift_etl-1.2.2/bin/forklift
  • ruby x
  • ll /usr/local/rvm/gems/ruby-2.3.0/gems/forklift_etl-1.2.2/bin/../lib/forklift.rb
  • ll ../template/

原文地址 http://soft.dog/2016/08/12/forklift-basic-1/

类似博客

评论