在pyspark中测试累加器,但出错了:
def test():
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf).getOrCreate()
rdds = sc.parallelize([Row(user="spark", item="book"), Row(user="spark", item="goods"),
Row(user="hadoop", item="book"), Row(user="python", item="duck")])
acc = sc.accumulator(0)
print("accumulator: {}".format(acc))
def imap(row):
global acc
acc += 1
return row
rdds.map(imap).foreach(print)
print(acc.value)
错误是:
...
return f(*args, **kwargs)
File "test_als1.py", line 205, in imap
acc += 1
NameError: name 'acc' is not defined
But I set the acc
as global variable, how can I write the code?