python模拟用户登录注册定义函数user_在Python类中注册spark SQL用户定义函数

我正在通过pythonapi使用Spark进行一些数据处理。下面是我正在学习的课程的一个简化部分:class data_processor(object):

def __init__(self,filepath):

self.config = Config() # this loads some config options from file

self.type_conversions = {int:IntegerType,str:StringType}

self.load_data(filepath)

self.format_age()

def load_data(self,filepath,delim='\x01'):

cols = [...] # list of column names

types = [int, str, str, ... ] # list of column types

user_data = sc.textFile(filepath,use_unicode=False).map(lambda row: [types[i](val) for i,val in enumerate(row.strip().split(delim))])

fields = StructType([StructField(field_name,self.type_conversions[field_type]()) for field_name,field_type in zip(cols,types)])

self.user_data = user_data.toDF(fields)

self.user_data.registerTempTable('data')

def format_age(self):

age_range = self.config.age_range # tuple of (age_min, age_max)

age_bins = self.config.age_bins # list of bin boundaries

def _format_age(age):

if ageage_range[1]:

return None

else:

return np.digitize([age],age_bins)[0]

sqlContext.udf.register('format_age', lambda x: _format_age(x), IntegerType())

现在,如果我用data=data_processor(filepath)实例化类,我可以很好地对数据帧进行查询。例如,这是有效的:

^{pr2}$

但我显然没有正确地建立udf。例如,如果我尝试sqlContext.sql("select age, format_age(age) from data limit 10").take(1)

我得到一个错误:Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

(有一个很长的stacktrace,典型的Spark,太长了,不能包含在这里)。在

那么,我到底做错了什么?在这样的方法(最好是作为类方法)中定义UDF的正确方法是什么。我知道Spark不喜欢传递类对象,因此使用了format_age(灵感来自this question)的嵌套结构。在

有什么想法?在


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部