从Oracle用goldengate抽取数据到kafka-创新互联

Goldengate到kafka配置详解

站在用户的角度思考问题,与客户深入沟通,找到金秀网站设计与金秀网站推广的解决方案,凭借多年的经验,让设计与互联网技术结合,创造个性化、用户体验好的作品,建站类型包括:做网站、成都网站设计、企业官网、英文网站、手机端网站、网站推广、域名申请、网络空间、企业邮箱。业务覆盖金秀地区。

环境介绍:

源端数据库版本

源端OGG版本

目标端OGG版本

Kafka集群

目标端数据库GP

11.2.0.3

12.2.0.1.1

ggs_Adapters_Linux_x64

切记OGG版本是for big data

12.3.0.1.0

源端配置:

1.1安装OGG软件。

OGG软件不做要求12版本即可

配置MGR

PORT 7810

DYNAMICPORTLIST 7811-7914

AUTORESTART REPLICAT dpe*, WAITMINUTES 1, RETRIES 5

AUTORESTART REPLICAT ext*, WAITMINUTES 1, RETRIES 5

PURGEOLDEXTRACTS /home/ogg/kafka_ogg/dirdat/kf*,USECHECKPOINTS, minkeephours 6

配置ext抽取进程参数:

EXTRACT extkaf

--setenv (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")

setenv (ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1)

userid goldengate@ogg ,password  Golden_1230

--getupdatebefores

GETTRUNCATES

REPORTCOUNT EVERY 15 MINUTES, RATE

DISCARDFILE ./dirrpt/extkaf.dsc,APPEND,MEGABYTES 1024

--THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000 IOLATENCY 90000

DBOPTIONS  ALLOWUNUSEDCOLUMN

--WARNLONGTRANS 2h,CHECKINTERVAL 3m

EXTTRAIL ./dirdat/kf

-- TRANLOGOPTIONS  CONVERTUCS2CLOBS

TRANLOGOPTIONS EXCLUDEUSER goldengate

TRANLOGOPTIONS DBLOGREADER

-- TRANLOGOPTIONS _noReadAhead Any

--DYNAMICRESOLUTION

Table schema1.tablename1;

Table schema1.tablename2;

Table schema1.tablename3 ;

Table4schema1.tablename4;

配置投递进程:

extract dpekaf

rmthost 172.31.31.10,mgrport 7810

passthru

numfiles 500

rmttrail /home/ogg/kafka_ogg/dirdat/kf

------------------------------------------------------------       

Table schema1.tablename1;

Table schema1.tablename2;

Table schema1.tablename3 ;

Table4schema1.tablename4;

-----===========================目标端配置==================

Goldengate  for  big data

目标端配置MGR:

PORT 7810

DYNAMICPORTLIST 7811-7914

AUTORESTART REPLICAT rep*, WAITMINUTES 1, RETRIES 5

PURGEOLDEXTRACTS /home/ogg/kafka_ogg/dirdat/kf*,USECHECKPOINTS, minkeephours 6

配置replicat进程:

入库进程1:

replicat repykaf1

--setenv (JAVA_HOME=/home/ogg/jdk1.8.0_111)

--setenv (JRE_HOME=/home/ogg/jdk1.8.0_111/jre)

--setenv (PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH)

--setenv (CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib)

--setenv (LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:/home/ogg/kafka_ogg/lib)

--getenv (JAVA_HOME)

--getenv (JRE_HOME)

--getenv (CLASSPATH)

--getenv (LD_LIBRARY_PATH)

--getenv (PATH)

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka1.props

GETTRUNCATES

REPORTCOUNT EVERY 1 MINUTES ,RATE

GROUPTRANSOPS 1000

Table schema1.tablename1;

入库进程2:

replicat repykaf2

--setenv (JAVA_HOME=/home/ogg/jdk1.8.0_111)

--setenv (JRE_HOME=/home/ogg/jdk1.8.0_111/jre)

--setenv (PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH)

--setenv (CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib)

--setenv (LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:/home/ogg/kafka_ogg/lib)

--getenv (JAVA_HOME)

--getenv (JRE_HOME)

--getenv (CLASSPATH)

--getenv (LD_LIBRARY_PATH)

--getenv (PATH)

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka2.props

GETTRUNCATES

REPORTCOUNT EVERY 1 MINUTES ,RATE

GROUPTRANSOPS 1000

Table schema1.tablename2;

入库进程3:

replicat repykaf3

--setenv (JAVA_HOME=/home/ogg/jdk1.8.0_111)

--setenv (JRE_HOME=/home/ogg/jdk1.8.0_111/jre)

--setenv (PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH)

--setenv (CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib)

--setenv (LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:/home/ogg/kafka_ogg/lib)

--getenv (JAVA_HOME)

--getenv (JRE_HOME)

--getenv (CLASSPATH)

--getenv (LD_LIBRARY_PATH)

--getenv (PATH)

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka3.props

GETTRUNCATES

REPORTCOUNT EVERY 1 MINUTES ,RATE

GROUPTRANSOPS 1000

Table schema1.tablename3;

入库进程4:

replicat repykaf4

--setenv (JAVA_HOME=/home/ogg/jdk1.8.0_111)

--setenv (JRE_HOME=/home/ogg/jdk1.8.0_111/jre)

--setenv (PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH)

--setenv (CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib)

--setenv (LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:/home/ogg/kafka_ogg/lib)

--getenv (JAVA_HOME)

--getenv (JRE_HOME)

--getenv (CLASSPATH)

--getenv (LD_LIBRARY_PATH)

--getenv (PATH)

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka4.props

GETTRUNCATES

REPORTCOUNT EVERY 1 MINUTES ,RATE

GROUPTRANSOPS 1000

Table schema1.tablename4;

配置去kafka的参数文件

我的OGG解压目录是:kafka_ogg

在OGG的解压目录下有:AdapterExamples 文件夹

从Oracle用goldengate抽取数据到kafka

cp /home/ogg/kafka_ogg/AdapterExamples/big-data/kafka/*   /home/ogg/kafka_ogg/dirprm/

编辑:

Vi   custom_kafka_producer.properties

###bootstrap.servers=ip:端口,ip:端口 例子

bootstrap.servers=172.31.31.10:6667,172.31.31.11:6667,172.31.31.12:6667,172.31.31.13:6667

acks=1

reconnect.backoff.ms=1000

compression.type=gzip

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

# 100KB per partition

batch.size=102400

linger.ms=10000

max.request.size=10240000

send.buffer.bytes=10240000                                                                                                                                          

编辑

Kafka1.props

gg.handlerlist = kafkahandler

gg.handler.kafkahandler.type=kafka

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

gg.handler.kafkahandler.topicMappingTemplate=topic-name

(这里写你创建的topic  name )

gg.handler.kafkahandler.format=json

gg.handler.kafkahandler.BlockingSend =false

gg.handler.kafkahandler.includeTokens=false

gg.handler.kafkahandler.mode=tx

goldengate.userexit.timestamp=utc

goldengate.userexit.writers=javawriter

javawriter.stats.display=TRUE

javawriter.stats.full=TRUE

gg.log=log4j

gg.log.level=INFO

gg.report.time=30sec

gg.classpath=dirprm/:/home/ogg/kafka_ogg/ggjava/resources/lib/*:/usr/hdp/2.4.0.0-169/kafka/libs/*

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

goldengate.userexit.utf8mode=true

gg.handler.kafkahandler.keyMappingTemplate=HH

gg.handler.kafkahandler.format.includePrimaryKeys=true


本文标题:从Oracle用goldengate抽取数据到kafka-创新互联
网页网址:http://azwzsj.com/article/gjede.html