使用python将lambda表达式的输出保存到文本文件

在2gb日志文件中,我尝试根据特定模式过滤一些日志,并将这些过滤后的日志保存在文本文件中。 以下是我的python代码(nginxDigger.py):-

from pyspark import SparkContext
from datetime import date, timedelta, datetime
import os.path

sc = SparkContext()

txtFile=sc.textFile("/akshay/172__20200204-0557.access.log.gz")
filtered = txtFile.filter(lambda x: "Conversion?" in x)
filtered.saveAsTextFile("/home/hdduser/infra/python/optest.txt")

样本日志:-

2409:ssdd:dfg:we45:c45r:9ccc:ccc1:ccc5 - - [04/Feb/2020:04:30:35 +0000] "GET /Tracker/Conversion?p1=2244&p2=173139901&gtmcb=314023959 HTTP/1.1" 200 177 "https://termlife.policybazaar.com/?offerid=4002&utm_source=tracktrack&utm_medium=affiliate&utm_term=tracktrack_5e38f30cc2567200019b874b&utm_campaign=5e38f30cc2567200019b874b" "Mozilla/5.0 (Linux; Android 8.1.0; vivo 1724) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.93 Mobile Safari/537.36" "-" IID="-" ECG="-" 192.123.123.123:8080 upstream_rt=0.004 total_rs=0.004

在日志中看不到任何错误。我正在使用上方的spark-submit运行以上代码, spark-submit --executor-memory 3g --num-executors 5 --executor-cores 1 /home/hdduser/infra/python/nginxDigger.py

以下是日志:-

20200206
20/02/07 07:04:12 INFO SparkContext: Running Spark version 2.2.1
20/02/07 07:04:12 WARN SparkConf: Detected deprecated memory fraction settings: [spark.shuffle.memoryFraction]. As of Spark 1.6, execution and storage memory management are unified. All memory fractions used in the old model are now deprecated and no longer read. If you wish to use the old memory management, you may explicitly enable `spark.memory.useLegacyMode` (not recommended).
20/02/07 07:04:12 INFO SparkContext: Submitted application: nginxDigger.py
20/02/07 07:04:12 INFO SecurityManager: Changing view acls to: hduser
20/02/07 07:04:12 INFO SecurityManager: Changing modify acls to: hduser
20/02/07 07:04:12 INFO SecurityManager: Changing view acls groups to:
20/02/07 07:04:12 INFO SecurityManager: Changing modify acls groups to:
20/02/07 07:04:12 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hdduser); groups with view permissions: Set(); users  with modify permissions: Set(hdduser); groups with modify permissions: Set()
20/02/07 07:04:13 INFO Utils: Successfully started service 'sparkDriver' on port 34441.
20/02/07 07:04:13 INFO SparkEnv: Registering MapOutputTracker
20/02/07 07:04:13 INFO SparkEnv: Registering BlockManagerMaster
20/02/07 07:04:13 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/02/07 07:04:13 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/02/07 07:04:13 INFO DiskBlockManager: Created local directory at /usr/local/spark/tmp/blockmgr-4a81eafa-5b81-410b-8832-36273894847
20/02/07 07:04:13 INFO MemoryStore: MemoryStore started with capacity 3.6 GB
20/02/07 07:04:13 INFO SparkEnv: Registering OutputCommitCoordinator
20/02/07 07:04:13 INFO Utils: Successfully started service 'SparkUI' on port AAAA.
20/02/07 07:04:14 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.xxx.xxx.xxx:AAAA
20/02/07 07:04:14 INFO FairSchedulableBuilder: Creating Fair Scheduler pools from default file: fairscheduler.xml
20/02/07 07:04:14 INFO FairSchedulableBuilder: Created pool: default, schedulingMode: FAIR, minShare: 2, weight: 5
20/02/07 07:04:14 INFO FairSchedulableBuilder: Created pool: production, schedulingMode: FAIR, minShare: 2, weight: 4
20/02/07 07:04:14 INFO FairSchedulableBuilder: Created pool: test, schedulingMode: FIFO, minShare: 3, weight: 2
20/02/07 07:04:15 INFO RMProxy: Connecting to ResourceManager at HDP-master/192.xxx.xxx.xxx:ABCD
20/02/07 07:04:15 INFO Client: Requesting a new application from cluster with 3 NodeManagers
20/02/07 07:04:15 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (10752 MB per container)
20/02/07 07:04:15 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
20/02/07 07:04:15 INFO Client: Setting up container launch context for our AM
20/02/07 07:04:15 INFO Client: Setting up the launch environment for our AM container
20/02/07 07:04:15 INFO Client: Preparing resources for our AM container
20/02/07 07:04:16 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
20/02/07 07:04:19 INFO Client: Uploading resource file:/usr/local/spark/tmp/spark-dhfjfkk38hc-3wdd-dfd3-88a8-2350daf88e3a/__spark_libs__3748498372.zip -> hdfs://HDP-MASTER:54310/user/hduser/.sparkStaging/application_273849484568_0033/__spark_libs__28494847578593.zip
20/02/07 07:04:23 INFO Client: Uploading resource file:/usr/local/spark/python/lib/pyspark.zip -> hdfs://HDP-MASTER:54310/user/hduser/.sparkStaging/application_273849484568_0033/pyspark.zip
20/02/07 07:04:23 INFO Client: Uploading resource file:/usr/local/spark/python/lib/py4j-0.10.4-src.zip -> hdfs://HDP-MASTER:54310/user/hduser/.sparkStaging/application_273849484568_0033/py4j-0.10.4-src.zip
20/02/07 07:04:23 INFO Client: Uploading resource file:/usr/local/spark/tmp/spark-fa5d15ce-3ff0-42f6-88a8-2350daf88e3a/__spark_conf__7839847583933.zip -> hdfs://HDP-MASTER:54310/user/hduser/.sparkStaging/application_273849484568_0033/__spark_conf__.zip
20/02/07 07:04:23 INFO SecurityManager: Changing view acls to: hdduser
20/02/07 07:04:23 INFO SecurityManager: Changing modify acls to: hdduser
20/02/07 07:04:23 INFO SecurityManager: Changing view acls groups to:
20/02/07 07:04:23 INFO SecurityManager: Changing modify acls groups to:
20/02/07 07:04:23 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hdduser); groups with view permissions: Set(); users  with modify permissions: Set(hdduser); groups with modify permissions: Set()
20/02/07 07:04:23 INFO Client: Submitting application application_273849484568_0033 to ResourceManager
20/02/07 07:04:23 INFO YarnClientImpl: Submitted application application_273849484568_0033
20/02/07 07:04:23 INFO SchedulerExtensionServices: Starting Yarn extension services with app application_273849484568_0033 and attemptId None
20/02/07 07:04:24 INFO Client: Application report for application_273849484568_0033 (state: ACCEPTED)
20/02/07 07:04:24 INFO Client:
         client token: N/A
         diagnostics: AM container is launched, waiting for AM container to Register with RM
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: root.hduser
         start time: 1581059063800
         final status: UNDEFINED
         tracking URL: http://li1756-36.members.linode.com:8056/proxy/application_1580994258764_0033/
         user: hduser
20/02/07 07:04:25 INFO Client: Application report for application_273849484568_0033 (state: ACCEPTED)
20/02/07 07:04:26 INFO Client: Application report for application_273849484568_0033 (state: ACCEPTED)
20/02/07 07:04:27 INFO Client: Application report for application_273849484568_0033 (state: ACCEPTED)
20/02/07 07:04:28 INFO Client: Application report for application_273849484568_0033 (state: ACCEPTED)
20/02/07 07:04:28 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)
20/02/07 07:04:29 INFO YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> li1756-36.members.linode.com, PROXY_URI_BASES -> http://li1756-36.members.linode.com:8056/proxy/application_273849484568_0033), /proxy/application_273849484568_0033
20/02/07 07:04:29 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
20/02/07 07:04:29 INFO Client: Application report for application_273849484568_0033 (state: RUNNING)
20/02/07 07:04:29 INFO Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: 192.xxx.xxx.xxx
         ApplicationMaster RPC port: 0
         queue: root.hduser
         start time: 273849484568
         final status: UNDEFINED
         tracking URL: http://shflfof.jfkfjfys.com:ABCD/proxy/application_273849484568_0033/
         user: hduser
20/02/07 07:04:29 INFO YarnClientSchedulerBackend: Application application_273849484568_0033 has started running.
20/02/07 07:04:29 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 41855.
20/02/07 07:04:29 INFO NettyBlockTransferService: Server created on 192.xxx.xxx.xxx:4ABCD
20/02/07 07:04:29 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/02/07 07:04:29 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.xxx.xxx.xxx, 4DCBA, None)
20/02/07 07:04:29 INFO BlockManagerMasterEndpoint: Registering block manager 192.xxx.xxx.xxx:4ABCD with 3.6 GB RAM, BlockManagerId(driver, 192.xxx.xxx.xxx, 4ABCD, None)
20/02/07 07:04:29 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.xxx.xxx.xxx, 4ABCD, None)
20/02/07 07:04:29 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.xxx.xxx.xxx, 4ABCD, None)
20/02/07 07:04:30 INFO EventLoggingListener: Logging events to hdfs://hdp-master:54310/spark-logs/application_273849484568_0033
20/02/07 07:04:33 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.xxx.abc.abc:4DCBA) with ID 2
20/02/07 07:04:33 INFO BlockManagerMasterEndpoint: Registering block manager HDP-slave-2:34693 with 1458.6 MB RAM, BlockManagerId(2, HDP-slave-2, 34693, None)
20/02/07 07:04:34 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.xxx.abc.abc:34374) with ID 1
20/02/07 07:04:34 INFO BlockManagerMasterEndpoint: Registering block manager HDP-slave-3:25179 with 1458.6 MB RAM, BlockManagerId(1, HDP-slave-3, 25179, None)
20/02/07 07:04:44 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
20/02/07 07:04:44 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 286.0 KB, free 3.6 GB)
20/02/07 07:04:44 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.0 KB, free 3.6 GB)
20/02/07 07:04:44 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.xxx.xxx.xxx:41855 (size: 24.0 KB, free: 3.6 GB)
20/02/07 07:04:44 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:0
20/02/07 07:04:44 INFO FileInputFormat: Total input paths to process : 1
20/02/07 07:04:44 INFO NetworkTopology: Adding a new node: /default-rack/192.xxx.abc.xxx:50010
20/02/07 07:04:44 INFO NetworkTopology: Adding a new node: /default-rack/192.xxx.abc.xxx:50010
20/02/07 07:04:44 INFO NetworkTopology: Adding a new node: /default-rack/192.xxx.abc.xxx:50010
20/02/07 07:04:44 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/02/07 07:04:44 INFO DAGScheduler: Got job 0 (runJob at PythonRDD.scala:455) with 1 output partitions
20/02/07 07:04:44 INFO DAGScheduler: Final stage: ResultStage 0 (runJob at PythonRDD.scala:455)
20/02/07 07:04:44 INFO DAGScheduler: Parents of final stage: List()
20/02/07 07:04:44 INFO DAGScheduler: Missing parents: List()
20/02/07 07:04:44 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at RDD at PythonRDD.scala:48), which has no missing parents
20/02/07 07:04:45 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.4 KB, free 3.6 GB)
20/02/07 07:04:45 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.4 KB, free 3.6 GB)
20/02/07 07:04:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.xxx.xxx.xxx:4ABCD (size: 3.4 KB, free: 3.6 GB)
20/02/07 07:04:45 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
20/02/07 07:04:45 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (PythonRDD[2] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(0))
20/02/07 07:04:45 INFO YarnScheduler: Adding task set 0.0 with 1 tasks
20/02/07 07:04:45 INFO FairSchedulableBuilder: Added task set TaskSet_0.0 tasks to pool default
20/02/07 07:04:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, HDP-slave-3, executor 1, partition 0, RACK_LOCAL, 4903 bytes)
20/02/07 07:04:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on HDP-slave-3:25179 (size: 3.4 KB, free: 1458.6 MB)
20/02/07 07:04:45 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on HDP-slave-3:25179 (size: 24.0 KB, free: 1458.6 MB)
20/02/07 07:04:47 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2199 ms on HDP-slave-3 (executor 1) (1/1)
20/02/07 07:04:47 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool default
20/02/07 07:04:47 INFO DAGScheduler: ResultStage 0 (runJob at PythonRDD.scala:455) finished in 2.218 s
20/02/07 07:04:47 INFO DAGScheduler: Job 0 finished: runJob at PythonRDD.scala:455, took 2.398177 s
20/02/07 07:04:47 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
20/02/07 07:04:47 INFO SparkContext: Starting job: saveAsTextFile at NativeMethodAccessorImpl.java:0
20/02/07 07:04:47 INFO DAGScheduler: Got job 1 (saveAsTextFile at NativeMethodAccessorImpl.java:0) with 1 output partitions
20/02/07 07:04:47 INFO DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at NativeMethodAccessorImpl.java:0)
20/02/07 07:04:47 INFO DAGScheduler: Parents of final stage: List()
20/02/07 07:04:47 INFO DAGScheduler: Missing parents: List()
20/02/07 07:04:47 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at NativeMethodAccessorImpl.java:0), which has no missing parents
20/02/07 07:04:47 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 76.4 KB, free 3.6 GB)
20/02/07 07:04:47 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 29.1 KB, free 3.6 GB)
20/02/07 07:04:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.xxx.xxx.xxx:4ABCD (size: 29.1 KB, free: 3.6 GB)
20/02/07 07:04:47 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
20/02/07 07:04:47 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
20/02/07 07:04:47 INFO YarnScheduler: Adding task set 1.0 with 1 tasks
20/02/07 07:04:47 INFO FairSchedulableBuilder: Added task set TaskSet_1.0 tasks to pool default
20/02/07 07:04:47 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, HDP-slave-2, executor 2, partition 0, RACK_LOCAL, 4903 bytes)
20/02/07 07:04:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on HDP-slave-2:34693 (size: 29.1 KB, free: 1458.6 MB)
20/02/07 07:04:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on HDP-slave-2:34693 (size: 24.0 KB, free: 1458.5 MB)
20/02/07 07:07:21 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 154046 ms on HDP-slave-2 (executor 2) (1/1)
20/02/07 07:07:21 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool default
20/02/07 07:07:21 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at NativeMethodAccessorImpl.java:0) finished in 154.047 s
20/02/07 07:07:21 INFO DAGScheduler: Job 1 finished: saveAsTextFile at NativeMethodAccessorImpl.java:0, took 154.076742 s
20/02/07 07:07:21 INFO SparkContext: Invoking stop() from shutdown hook
20/02/07 07:07:21 INFO SparkUI: Stopped Spark web UI at http://192.xxx.xxx.xxx:ABCD
20/02/07 07:07:21 INFO YarnClientSchedulerBackend: Interrupting monitor thread
20/02/07 07:07:21 INFO YarnClientSchedulerBackend: Shutting down all executors
20/02/07 07:07:21 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
20/02/07 07:07:21 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
20/02/07 07:07:21 INFO YarnClientSchedulerBackend: Stopped
20/02/07 07:07:21 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/02/07 07:07:21 INFO MemoryStore: MemoryStore cleared
20/02/07 07:07:21 INFO BlockManager: BlockManager stopped
20/02/07 07:07:21 INFO BlockManagerMaster: BlockManagerMaster stopped
20/02/07 07:07:21 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/02/07 07:07:21 INFO SparkContext: Successfully stopped SparkContext
20/02/07 07:07:21 INFO ShutdownHookManager: Shutdown hook called
20/02/07 07:07:21 INFO ShutdownHookManager: Deleting directory /usr/local/spark/tmp/spark-fa5d15ce-3ff0-42f6-88a8-2350daf88e3a
20/02/07 07:07:21 INFO ShutdownHookManager: Deleting directory /usr/local/spark/tmp/spark-fa5d15ce-3ff0-42f6-88a8-2350daf88e3a/pyspark-437bd821-345f-4e19-b650-37a335078bf3

我是python和spark的新手。 任何建议/帮助都非常感谢。

评论