Hadoop UserGroupInformation详解

下面大概了解下面Java的认证相关框架

JAAS 认证和授权框架,只要负责用户的认证和权限。

SASL client 和 server之间认证的框架

GSS 是sasl的一个provider,也就是实现了sasl框架

参考JAAS/GSS-API/SASL/Kerberos简介 | NoSQL漫谈

网上关于high level介绍的还比较多,可以搜索一些,但是要真正理解UserGroupInfomration的功能,还是需要研究 UserGroupInformaiton在 hadoop/hive等大数据系统中如何使用的。

介绍UserGroupInformation 之前,应该了解一下 JaaS框架里 Subject这个概念,简单理解就是代表了一个用户。当通过JAAS登录成功之后,会产生一个subject, 里面包含了一组 Principal, 和其他凭证信息,不如如果是Kerbos认证的话,就会有一个Kerbos身份的principal, 以及把tgt等Credentials的信息放到  privCredentials中。

CASE ONE, 客户端和服务端建立连接

当client 跟Server端建立连接的时候,一般会使用

UserGroupInfomation.doAs(action {

   建立连接的代码

}) 

doAs就会使用UserGroupInformation当前subject执行

这样在建立连接的过程中,就是用了登录后的subject 来建立链接,因为subject包含了kerbos的Principal, 并且拥有合法的Credentials,  sasl client和 sasl server在 建立连接的过程中就会使用到。 真实的底层使用的是Gss的实现。

doAs 里面的实现逻辑

@InterfaceAudience.Public@InterfaceStability.Evolvingpublic  T doAs(PrivilegedAction action) {if (LOG.isDebugEnabled()) {LOG.debug("PrivilegedAction [as: {}][action: {}]", this, action,new Exception());}//使用当前subject执行action,也就是建立连接的时候使用登录过的subjectreturn Subject.doAs(subject, action);}

当然,如果没有使用UserGroupInformation.doAs的时候,

建立sasl连接的时候使用的就是当前安全上线文里的subject, 用户已经使用Jaas登录过的,登录之后,subject的信息已经存在AccessConect中了,所以也会使用登录后的subject。

CASE TWO 在服务端执行ACTION

连接到服务端之后,一般服务端有两个UGI,一个是服务本身的UGI,比如NN 一般是hdfs, 或者MetaStore, 一般是hive。 hdfs或者 hive用户是服务里的超级用户,客户端一般创建连接之后,执行的操作的用户是 client的用户,这时候就会用到UserGroupInformation里面 proxyUser的概念。

proxyUser顾名思义,就是代理一个用户执行操作。切记,被代理的用户在服务端是没有通过JAAS 认证过的用户,只是给一个用户名而已。 不过这个用户在客户端已经通过JAAS认证过了。

创建代理用户的代码如下:

/*** Create a proxy user using username of the effective user and the ugi of the* real user.* @param user* @param realUser* @return proxyUser ugi*/@InterfaceAudience.Public@InterfaceStability.Evolvingpublic static UserGroupInformation createProxyUser(String user,UserGroupInformation realUser) {if (user == null || user.isEmpty()) {throw new IllegalArgumentException("Null user");}if (realUser == null) {throw new IllegalArgumentException("Null real user");}Subject subject = new Subject();Set principals = subject.getPrincipals();principals.add(new User(user, AuthenticationMethod.PROXY, null));principals.add(new RealUser(realUser));return new UserGroupInformation(subject);}

代理用户的UserGroupInformation里的subject是创建出来的,里面设置了一个 User的principal, 以及一个 RealUser的principal. 这个RealUser的principal是拥有kerbos认证过的,拥有kerbos秘钥。

代理用户的 subject里,没有任何秘钥信息。

如果用户被代理了,一般在服务端执行action的时候都会使用下面格式

UserGroupInformation.doAs

这样在需要权限认证的地方,就获取到的是代理用户的用户名。

切记,不能用代理用户创建 sasl的connection, 因为没有凭证,会失败的。

获取当前用户

UserGroupInformation.getCurrentUser

/*** Return the current user, including any doAs in the current stack.* @return the current user* @throws IOException if login fails*/@InterfaceAudience.Public@InterfaceStability.Evolvingpublic static UserGroupInformation getCurrentUser() throws IOException {ensureInitialized();AccessControlContext context = AccessController.getContext();Subject subject = Subject.getSubject(context);if (subject == null || subject.getPrincipals(User.class).isEmpty()) {return getLoginUser();} else {return new UserGroupInformation(subject);}}

从安全上下文获取当前的subject, 如果没有,就获取 getLoginUser, 这时候就触发登录操作。

注意1  里面有一个判断条件是 subject.getPrincipals(User.class).isEnpty()

也就是只有subject里有User这种principals才算是登录过的。

注意2,第一行执行了ensuerInitialized(),这个方法很重要,初始化了hadoop的安全环境.

登录

/*** Get the currently logged in user.  If no explicit login has occurred,* the user will automatically be logged in with either kerberos credentials* if available, or as the local OS user, based on security settings.* @return the logged in user* @throws IOException if login fails*/@InterfaceAudience.Public@InterfaceStability.Evolvingpublic static UserGroupInformation getLoginUser() throws IOException {ensureInitialized();UserGroupInformation loginUser = loginUserRef.get();// a potential race condition exists only for the initial creation of// the login user.  there's no need to penalize all subsequent calls// with sychronization overhead so optimistically create a login user// and discard if we lose the race.if (loginUser == null) {UserGroupInformation newLoginUser = createLoginUser(null);do {// it's extremely unlikely that the login user will be non-null// (lost CAS race), but be nulled before the subsequent get, but loop// for correctness.if (loginUserRef.compareAndSet(null, newLoginUser)) {loginUser = newLoginUser;// only spawn renewal if this login user is the winner.loginUser.spawnAutoRenewalThreadForUserCreds(false);} else {loginUser = loginUserRef.get();}} while (loginUser == null);}return loginUser;}

如果没有loginUser,就调用 createLoginUser(null)

private staticUserGroupInformation createLoginUser(Subject subject) throws IOException {//登录UserGroupInformation realUser = doSubjectLogin(subject, null);UserGroupInformation loginUser = null;try {//代理用户的一些设置// If the HADOOP_PROXY_USER environment variable or property// is specified, create a proxy user as the logged in user.String proxyUser = System.getenv(HADOOP_PROXY_USER);if (proxyUser == null) {proxyUser = System.getProperty(HADOOP_PROXY_USER);}loginUser = proxyUser == null ? realUser : createProxyUser(proxyUser, realUser);//从配置的文件中获取Token// Load tokens from filesfinal Collection tokenFileLocations = new LinkedHashSet<>();tokenFileLocations.addAll(getTrimmedStringCollection(System.getProperty(HADOOP_TOKEN_FILES)));tokenFileLocations.addAll(getTrimmedStringCollection(conf.get(HADOOP_TOKEN_FILES)));tokenFileLocations.addAll(getTrimmedStringCollection(System.getenv(HADOOP_TOKEN_FILE_LOCATION)));for (String tokenFileLocation : tokenFileLocations) {if (tokenFileLocation != null && tokenFileLocation.length() > 0) {File tokenFile = new File(tokenFileLocation);LOG.debug("Reading credentials from location {}",tokenFile.getCanonicalPath());if (tokenFile.exists() && tokenFile.isFile()) {Credentials cred = Credentials.readTokenStorageFile(tokenFile, conf);LOG.debug("Loaded {} tokens from {}", cred.numberOfTokens(),tokenFile.getCanonicalPath());loginUser.addCredentials(cred);} else {LOG.info("Token file {} does not exist",tokenFile.getCanonicalPath());}}}// Load tokens from base64 encodingfinal Collection tokensBase64 = new LinkedHashSet<>();tokensBase64.addAll(getTrimmedStringCollection(System.getProperty(HADOOP_TOKENS)));tokensBase64.addAll(getTrimmedStringCollection(conf.get(HADOOP_TOKENS)));tokensBase64.addAll(getTrimmedStringCollection(System.getenv(HADOOP_TOKEN)));int numTokenBase64 = 0;for (String tokenBase64 : tokensBase64) {if (tokenBase64 != null && tokenBase64.length() > 0) {try {Token token = new Token<>();token.decodeFromUrlString(tokenBase64);Credentials cred = new Credentials();cred.addToken(token.getService(), token);loginUser.addCredentials(cred);numTokenBase64++;} catch (IOException ioe) {LOG.error("Cannot add token {}: {}",tokenBase64, ioe.getMessage());}}}if (numTokenBase64 > 0) {LOG.debug("Loaded {} base64 tokens", numTokenBase64);}} catch (IOException ioe) {LOG.debug("Failure to load login credentials", ioe);throw ioe;}LOG.debug("UGI loginUser: {}", loginUser);return loginUser;}

/*** Login a subject with the given parameters.  If the subject is null,* the login context used to create the subject will be attached.* @param subject to login, null for new subject.* @param params for login, null for externally managed ugi.* @return UserGroupInformation for subject* @throws IOException*/private static UserGroupInformation doSubjectLogin(Subject subject, LoginParams params) throws IOException {ensureInitialized();// initial default login.if (subject == null && params == null) {params = LoginParams.getDefaults();}HadoopConfiguration loginConf = new HadoopConfiguration(params);try {//这里是关键的一行代码,设置了loginContext的name, //HadoopConfiguration.KERBEROS_CONFIG_NAME 或者simpleHadoopLoginContext login = newLoginContext(authenticationMethod.getLoginAppName(), subject, loginConf);//JAAS的loginContext的loginlogin.login();//JAAS login之后,会产出一个subject.这个subject里包含了kerbos的认证信息UserGroupInformation ugi = new UserGroupInformation(login.getSubject());// attach login context for relogin unless this was a pre-existing// subject.if (subject == null) {params.put(LoginParam.PRINCIPAL, ugi.getUserName());ugi.setLogin(login);ugi.setLastLogin(Time.now());}return ugi;} catch (LoginException le) {KerberosAuthException kae =new KerberosAuthException(FAILURE_TO_LOGIN, le);if (params != null) {kae.setPrincipal(params.get(LoginParam.PRINCIPAL));kae.setKeytabFile(params.get(LoginParam.KEYTAB));kae.setTicketCacheFile(params.get(LoginParam.CCACHE));}throw kae;}}

看一下HadoopLoginContext的代码

// wrapper to allow access to fields necessary to recreate the same login// context for relogin.  explicitly private to prevent external tampering.private static class HadoopLoginContext extends LoginContext {private final String appName;private final HadoopConfiguration conf;private AtomicBoolean isLoggedIn = new AtomicBoolean();HadoopLoginContext(String appName, Subject subject,HadoopConfiguration conf) throws LoginException {//这里设置里LoginContext的name以及confsuper(appName, subject, null, conf);this.appName = appName;this.conf = conf;}

LoginContext.init

public LoginContext(String name, Subject subject,CallbackHandler callbackHandler,Configuration config) throws LoginException {this.config = config;if (config != null) {creatorAcc = java.security.AccessController.getContext();}//loginContext初始化,name是外层传过来的init(name);if (subject != null) {this.subject = subject;subjectProvided = true;}if (callbackHandler == null) {loadDefaultCallbackHandler();} else if (creatorAcc == null) {this.callbackHandler = new SecureCallbackHandler(java.security.AccessController.getContext(),callbackHandler);} else {this.callbackHandler = callbackHandler;}}
private void init(String name) throws LoginException {SecurityManager sm = System.getSecurityManager();if (sm != null && creatorAcc == null) {sm.checkPermission(new AuthPermission("createLoginContext." + name));}if (name == null)throw new LoginException(ResourcesMgr.getString("Invalid.null.input.name"));// get the Configurationif (config == null) {config = java.security.AccessController.doPrivileged(new java.security.PrivilegedAction() {public Configuration run() {return Configuration.getConfiguration();}});}//划重点,这里根据name获取对应的 AppConfigurationEntry// get the LoginModules configured for this applicationAppConfigurationEntry[] entries = config.getAppConfigurationEntry(name);if (entries == null) {if (sm != null && creatorAcc == null) {sm.checkPermission(new AuthPermission("createLoginContext." + OTHER));}entries = config.getAppConfigurationEntry(OTHER);if (entries == null) {MessageFormat form = new MessageFormat(ResourcesMgr.getString("No.LoginModules.configured.for.name"));Object[] source = {name};throw new LoginException(form.format(source));}}moduleStack = new ModuleInfo[entries.length];for (int i = 0; i < entries.length; i++) {// clone returned arraymoduleStack[i] = new ModuleInfo(new AppConfigurationEntry(entries[i].getLoginModuleName(),entries[i].getControlFlag(),entries[i].getOptions()),null);}contextClassLoader = java.security.AccessController.doPrivileged(new java.security.PrivilegedAction() {public ClassLoader run() {ClassLoader loader =Thread.currentThread().getContextClassLoader();if (loader == null) {// Don't use bootstrap class loader directly to ensure// proper package access control!loader = ClassLoader.getSystemClassLoader();}return loader;}});}

上面提到的根据name获取的 AppConfigurationEntry 是在UserGroupInformation里自定义的

/*** A JAAS configuration that defines the login modules that we want* to use for login.*/@InterfaceAudience.Private@InterfaceStability.Unstableprivate static class HadoopConfigurationextends javax.security.auth.login.Configuration {static final String KRB5_LOGIN_MODULE =KerberosUtil.getKrb5LoginModuleName();static final String SIMPLE_CONFIG_NAME = "hadoop-simple";static final String KERBEROS_CONFIG_NAME = "hadoop-kerberos";private static final Map BASIC_JAAS_OPTIONS =new HashMap();static {if ("true".equalsIgnoreCase(System.getenv("HADOOP_JAAS_DEBUG"))) {BASIC_JAAS_OPTIONS.put("debug", "true");}}static final AppConfigurationEntry OS_SPECIFIC_LOGIN =new AppConfigurationEntry(OS_LOGIN_MODULE_NAME,LoginModuleControlFlag.REQUIRED,BASIC_JAAS_OPTIONS);static final AppConfigurationEntry HADOOP_LOGIN =new AppConfigurationEntry(HadoopLoginModule.class.getName(),LoginModuleControlFlag.REQUIRED,   //hadoop login是required. 也就是必须要调用这个loginModule.logBASIC_JAAS_OPTIONS);private final LoginParams params;HadoopConfiguration(LoginParams params) {this.params = params;}@Overridepublic LoginParams getParameters() {return params;}//重写方法根据名字获取AppConfigurationEntry,里面如果是kerbos,就代用getKerberosEntry增加kerbys的AppconfigurationEntry@Overridepublic AppConfigurationEntry[] getAppConfigurationEntry(String appName) {ArrayList entries = new ArrayList<>();// login of external subject passes no params.  technically only// existing credentials should be used but other components expect// the login to succeed with local user fallback if no principal.if (params == null || appName.equals(SIMPLE_CONFIG_NAME)) {entries.add(OS_SPECIFIC_LOGIN);} else if (appName.equals(KERBEROS_CONFIG_NAME)) {// existing semantics are the initial default login allows local user// fallback. this is not allowed when a principal explicitly// specified or during a relogin.if (!params.containsKey(LoginParam.PRINCIPAL)) {entries.add(OS_SPECIFIC_LOGIN);}entries.add(getKerberosEntry());}//这里添加了Hadooplogin的entryentries.add(HADOOP_LOGIN);return entries.toArray(new AppConfigurationEntry[0]);}private AppConfigurationEntry getKerberosEntry() {final Map options = new HashMap<>(BASIC_JAAS_OPTIONS);LoginModuleControlFlag controlFlag = LoginModuleControlFlag.OPTIONAL;// kerberos login is mandatory if principal is specified.  principal// will not be set for initial default login, but will always be set// for relogins.final String principal = params.get(LoginParam.PRINCIPAL);if (principal != null) {options.put("principal", principal);controlFlag = LoginModuleControlFlag.REQUIRED;}// use keytab if given else fallback to ticket cache.if (IBM_JAVA) {if (params.containsKey(LoginParam.KEYTAB)) {final String keytab = params.get(LoginParam.KEYTAB);if (keytab != null) {options.put("useKeytab", prependFileAuthority(keytab));} else {options.put("useDefaultKeytab", "true");}options.put("credsType", "both");} else {String ticketCache = params.get(LoginParam.CCACHE);if (ticketCache != null) {options.put("useCcache", prependFileAuthority(ticketCache));} else {options.put("useDefaultCcache", "true");}options.put("renewTGT", "true");}} else {if (params.containsKey(LoginParam.KEYTAB)) {options.put("useKeyTab", "true");final String keytab = params.get(LoginParam.KEYTAB);if (keytab != null) {options.put("keyTab", keytab);}options.put("storeKey", "true");} else {options.put("useTicketCache", "true");String ticketCache = params.get(LoginParam.CCACHE);if (ticketCache != null) {options.put("ticketCache", ticketCache);}options.put("renewTGT", "true");}options.put("doNotPrompt", "true");}options.put("refreshKrb5Config", "true");return new AppConfigurationEntry(KRB5_LOGIN_MODULE, controlFlag, options);}private static String prependFileAuthority(String keytabPath) {return keytabPath.startsWith("file://")? keytabPath: "file://" + keytabPath;}}

如果是kerbos的,就会增加 kerbos login entry

entries.add(getKerberosEntry()); 
默认的controlFlag是 可选的
LoginModuleControlFlag controlFlag = LoginModuleControlFlag.OPTIONAL;
// kerberos login is mandatory if principal is specified.  principal
// will not be set for initial default login, but will always be set
// for relogins.
final String principal = params.get(LoginParam.PRINCIPAL);
if (principal != null) {options.put("principal", principal);controlFlag = LoginModuleControlFlag.REQUIRED;
}

不管是哪种登录设置都会增加 HadoopLoginEntry

entries.add(HADOOP_LOGIN);

并且 HADOOP_LOGIN 的LoginModuleControlFlag 是REQUIRED 的。

所以要必须调用HadoopLoginModule.login

/*** A login module that looks at the Kerberos, Unix, or Windows principal and* adds the corresponding UserName.*/@InterfaceAudience.Privatepublic static class HadoopLoginModule implements LoginModule {private Subject subject;@Overridepublic boolean abort() throws LoginException {return true;}private  T getCanonicalUser(Class cls) {for(T user: subject.getPrincipals(cls)) {return user;}return null;}@Overridepublic boolean commit() throws LoginException {LOG.debug("hadoop login commit");// if we already have a user, we are done.if (!subject.getPrincipals(User.class).isEmpty()) {LOG.debug("Using existing subject: {}", subject.getPrincipals());return true;}//获取kerberos登录后的principal, 使用kerboers登录后的principal 构造Hadoop 的UserPrincipal user = getCanonicalUser(KerberosPrincipal.class);if (user != null) {LOG.debug("Using kerberos user: {}", user);}//If we don't have a kerberos user and security is disabled, check//if user is specified in the environment or propertiesif (!isSecurityEnabled() && (user == null)) {String envUser = System.getenv(HADOOP_USER_NAME);if (envUser == null) {envUser = System.getProperty(HADOOP_USER_NAME);}user = envUser == null ? null : new User(envUser);}//如果没有获取到kerberos登录后的principal, 使用操作系统登录后的principal 构造Hadoop 的User// use the OS userif (user == null) {user = getCanonicalUser(OS_PRINCIPAL_CLASS);LOG.debug("Using local user: {}", user);}// if we found the user, add our principalif (user != null) {LOG.debug("Using user: \"{}\" with name: {}", user, user.getName());User userEntry = null;try {// LoginContext will be attached later unless it's an external// subject.AuthenticationMethod authMethod = (user instanceof KerberosPrincipal)? AuthenticationMethod.KERBEROS : AuthenticationMethod.SIMPLE;userEntry = new User(user.getName(), authMethod, null);} catch (Exception e) {throw (LoginException)(new LoginException(e.toString()).initCause(e));}LOG.debug("User entry: \"{}\"", userEntry);subject.getPrincipals().add(userEntry);return true;}throw new LoginException("Failed to find user in name " + subject);}@Overridepublic void initialize(Subject subject, CallbackHandler callbackHandler,Map sharedState, Map options) {this.subject = subject;}@Overridepublic boolean login() throws LoginException {LOG.debug("Hadoop login");return true;}@Overridepublic boolean logout() throws LoginException {LOG.debug("Hadoop logout");return true;}}

这里是重点:

重点1: 优先使用kerberos登录后的principal, 构造Hadoop 的User

重点2: 如果没有获取到kerberos登录后的principal, 使用操作系统登录后的principal 构造Hadoop 的User

重点3: 不管是哪种登录认证,最终是要构造 User.class类型的principal,作为hadoop的登录用户。

到此,我们获得了登录用户。

TOKEN相关认证

UserGroupInfomration里还有 关于使用Token作为凭着的subject. 这是通过调用getDelegationToken获取到的token,构建了一个subject, 里面的creds就是token. 当AuthMethod是Token的时候,就获取subject.getCredis.get(Token.cass) 的方式获取token.

这样就能大概对UserGroupInformation有一个大概的了解。


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部