一个码农

在大数据边缘试探 开始使用

java 调度 Kettle 时使用 jndi 连接数据库

看了 Kettle 好久了,不得不说,国内资料真的是少啊,又是买书,又是自己翻译官方文档,又是看源码,好歹算是有些眉目了,今天有解决一大难题,在此记录下来,算是为国内 Kettle 圈奉献点力量吧。

Java 调度 Kettle

关于 Java 调度 Kettle,在网上还是能搜到一些内容的,其实还是比较简单的,Kettle 在 Spoon 中设计以后,会生成。ktr 和。kjb 文件,分别对应 Kettle 的转换和作业。Java 正是通过 Kettle 提供的 API 调用这些文件的。

准备工作

在通过 Java 调用 Kettle 时,需要下面几个 jar 包:

<dependency>
		  <groupId>pentaho-kettle</groupId>
		  <artifactId>kettle-engine</artifactId>
		  <version>${kettle.version}</version>
	  </dependency>
	  <dependency>
		  <groupId>pentaho</groupId>
		  <artifactId>metastore</artifactId>
		  <version>${kettle.version}</version>
	  </dependency>
	  <dependency>
		  <groupId>pentaho-kettle</groupId>
		  <artifactId>kettle-core</artifactId>
		  <version>${kettle.version}</version>
		  <exclusions>
			  <exclusion>
				  <groupId>jug-lgpl</groupId>
				  <artifactId>jug-lgpl</artifactId>
			  </exclusion>
			  <exclusion>
				  <groupId>secondstring</groupId>
				  <artifactId>secondstring</artifactId>
			  </exclusion>
			  <exclusion>
				  <artifactId>xercesImpl</artifactId>
				  <groupId>xerces</groupId>
			  </exclusion>
			  <exclusion>
				  <groupId>org.apache.xmlgraphics</groupId>
				  <artifactId>batik-js</artifactId>
			  </exclusion>
		  </exclusions>
	  </dependency>

这些 jar 包在 maven 仓库是没有的,所以还需要配上 kettle 的 maven 仓库地址:

 <!-- kettle中央仓库 -->
    <repositories>
        <repository>
            <id>pentaho-public</id>
            <name>Pentaho Public</name>
            <url>http://nexus.pentaho.org/content/groups/omni</url>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
            </releases>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
            </snapshots>
        </repository>
    </repositories>

配置好 maven 依赖以后,就可以通过 Java 来操作 kettle 的。kjb 和。ktr 文件了。

连接资源库

在 kettle 中,有一个很重要的概念就是资源库,资源库通俗理解就是管理 kettle 元数据,在多人开发的时候尤为重要,可以把资源库放在公共的地方供大家访问,目前 kettle 支持三种资源库:

  • 数据库资源库
  • 文件资源库
  • Pentaho 资源库

本文主要记录 Java 调用时使用 jndi 连接数据库的问题,Kettle 概念就不多赘述了如果需要了解,可以查看本人呢还未翻译完全的非官方文档.

在 Java 调用时,首先就是连接资源库,本人为了方便 SVN 版本控制,使用的是文件资源库。

获取资源库

以下代码仅仅是获取文件资源库的:

    /**
     *
     * @param basePath 主路径
     * @return KettleFileRepository
     */
    public static KettleFileRepository getFileRepository(String basePath){

        KettleFileRepositoryMeta meta = new KettleFileRepositoryMeta();
        URL url = KettleUtils.class.getClassLoader().getResource(basePath);
        String baseDirectory = url == null ? null : url.getPath();
        meta.setBaseDirectory(baseDirectory);
        KettleFileRepository kettleFileRepository = new KettleFileRepository();
        kettleFileRepository.init(meta);
        return kettleFileRepository;
    }

其中 ,通过

URL url = KettleUtils.class.getClassLoader().getResource(basePath);
        String baseDirectory = url == null ? null : url.getPath();

获取到资源库路径,我的资源库是直接放在项目目录的,也可以使用绝对路径。

连接资源库

在连接资源库进行操作之前,需要先进行初始化,而想要使用 jndi 连接数据库,也就需要在这个地方做文章了,首先,kettle 初始化的代码是
KettleEnvironment.init(true); 就这么简单一句,即可,kettle 会帮你完成初始化。
在这个 init 方法,需要一个 boolean 类型的参数,这个参数就是告诉 kettle 需要初始化 jndi 数据库配置文件。
首先看以下这个方法的源码:

 /**
   * Initializes the Kettle environment. This method performs the following operations:
   * <p/>
   * - Creates a Kettle "home" directory if it does not already exist - Reads in the kettle.properties file -
   * Initializes the logging back-end - Sets the console log level to debug - If specified by parameter, configures
   * Simple JNDI - Registers the native types and the plugins for the various plugin types - Reads the list of variables
   * - Initializes the Lifecycle listeners
   *
   * @param simpleJndi true to configure Simple JNDI, false otherwise
   * @throws KettleException Any errors that occur during initialization will throw a KettleException.
   */
  public static void init( boolean simpleJndi ) throws KettleException {
    init( Arrays.asList(
      RowDistributionPluginType.getInstance(),
      StepPluginType.getInstance(),
      StepDialogFragmentType.getInstance(),
      PartitionerPluginType.getInstance(),
      JobEntryPluginType.getInstance(),
      JobEntryDialogFragmentType.getInstance(),
      LogTablePluginType.getInstance(),
      RepositoryPluginType.getInstance(),
      LifecyclePluginType.getInstance(),
      KettleLifecyclePluginType.getInstance(),
      ImportRulePluginType.getInstance(),
      CartePluginType.getInstance(),
      CompressionPluginType.getInstance(),
      AuthenticationProviderPluginType.getInstance(),
      AuthenticationConsumerPluginType.getInstance(),
      EnginePluginType.getInstance()
    ), simpleJndi );
  }

可以看到,这个参数被传递到下个方法,我们继续往下看:
下面这个方法代码有点多,我只取主要的部分:

// Configure Simple JNDI when we run in stand-alone mode (spoon, pan, kitchen, carte, ... NOT on the platform
        //
        if ( simpleJndi ) {
          JndiUtil.initJNDI();
        }

可以看到,在代码里,判断了前面那个参数的值,如果是 true,就调用 JndiUtil.initJNDI(); 进行 jndi 的初始化,JndiUtil 类的代码:

public class JndiUtil {

  public static void initJNDI() throws KettleException {
    String path = Const.JNDI_DIRECTORY;

    if ( path == null || path.equals( "" ) ) {
      try {
        File file = new File( "simple-jndi" );
        path = file.getCanonicalPath();
      } catch ( Exception e ) {
        throw new KettleException( "Error initializing JNDI", e );
      }
      Const.JNDI_DIRECTORY = path;
    }

    System.setProperty( "java.naming.factory.initial", "org.osjava.sj.SimpleContextFactory" );
    System.setProperty( "org.osjava.sj.root", path );
    System.setProperty( "org.osjava.sj.delimiter", "/" );
  }

}

这里就是初始化 jndi 的部分了,这里 String path = Const.JNDI_DIRECTORY; 是 kettle 默认的 jndi 配置文件的位置,所以我们只需要将这个 值 修改了,就可以让 kettle 加载我们的配置文件,也就可以使用 jndi 数据源了。

于是我进行了尝试,直接在初始化之前,给 Const.JNDI_DIRECTORY 赋值,证明猜想确实是对的。

URL url = KettleUtils.class.getClassLoader().getResource(JNDI_PATH);
Const.JNDI_DIRECTORY = url == null ? null : url.getPath();

JNDI_PATH 是我 jndi 配置文件的目录名。

下面,将 jndi 连接名作为变量传给 kettle,就可以实现通过 通过变量连接 jndi 数据库了。

job.setVariable("jndiName","ODS");

之后,在 kettle 中,只需要获取变量名对应的连接即可:
image.png

这样即可实现多个数据源通过变量切换。

在获取 到资源库对象,并且已经 初始化 kettle 以后,直接通过 connect 方法就可以进行连接了,文件资源库默认是没有密码的,如果是数据库资源库,默认的账户和密码是 admin/admin

//连接资源库
repository.connect(null,null);

下面,就可以开心的调用 kettle 文件了:

获取作业

 RepositoryDirectoryInterface directoryInterface = repository.loadRepositoryDirectoryTree();
            JobMeta jobMeta = repository.loadJob(jobName,directoryInterface,null,null);

设置变量并执行 kettle 作业

Job job = new Job(repository,jobMeta);
            job.setVariable("jndiName","ODS");
            //接口参数设置为变量
            for(Map.Entry<String,String> entry:params.entrySet()){
                job.setVariable(entry.getKey(),entry.getValue());
                System.out.println("正在设置变量,key:"  + entry.getKey() + ",值为:" + entry.getValue());
            }
            //日志级别
            job.setLogLevel(LogLevel.ROWLEVEL);
            //执行作业

现在主要做大数据,kettle 只作为入门的工具,后面可能还会设计 Hadoop、spark 之类的,欢迎一起沟通交流。

一个可菜可菜的码农

评论
5 评论
gitors • 2019-06-29
回复 删除

附上全部的核心调用代码:

 /**
     *
     * @param basePath 主路径
     * @return KettleFileRepository
     */
    public static KettleFileRepository getFileRepository(String basePath){

        KettleFileRepositoryMeta meta = new KettleFileRepositoryMeta();
        URL url = KettleUtils.class.getClassLoader().getResource(basePath);
        String baseDirectory = url == null ? null : url.getPath();
        meta.setBaseDirectory(baseDirectory);
        KettleFileRepository kettleFileRepository = new KettleFileRepository();
        kettleFileRepository.init(meta);
        return kettleFileRepository;
    }

    /**
     * 执行作业
     * @param params 参数
     * @param repository 仓库
     * @param jobName 作业名
     */
    public static void executeJob(KettleParams params, KettleFileRepository repository , String jobName){
        try {

            URL url = KettleUtils.class.getClassLoader().getResource(JNDI_PATH);

            Const.JNDI_DIRECTORY = url == null ? null : url.getPath();

            KettleEnvironment.init(true);
            //连接资源库
            repository.connect(null,null);
            RepositoryDirectoryInterface directoryInterface = repository.loadRepositoryDirectoryTree();
            JobMeta jobMeta = repository.loadJob(jobName,directoryInterface,null,null);

            Job job = new Job(repository,jobMeta);
            job.setVariable("jndiName","ODS");
            //接口参数设置为变量
            for(Map.Entry<String,String> entry:params.entrySet()){
                job.setVariable(entry.getKey(),entry.getValue());
                System.out.println("正在设置变量,key:"  + entry.getKey() + ",值为:" + entry.getValue());
            }
            //日志级别
            job.setLogLevel(LogLevel.ROWLEVEL);
            //执行作业
            job.run();
        }catch (KettleException ke){
            logger.error("执行Kettle作业发生错误:",ke);
        }
    }
EricTao2 • 2019-07-10
回复 删除

赞一个!并且提一嘴,Java 可以直接写代码动态生成 Kettle 作业执行的,我们使用的是这种。当初也是资料找不到没使用你这种方法。

chenghuan10081 • 2019-08-30
回复 删除

ppp

chenghuan10081 • 2019-08-30
回复 删除
chenghuan10081 • 2019-09-01
回复 删除

shishi