当我尝试使用以下代码将数据从S3存储桶加载到AWS ES时,我可以获取请求发布状态代码为201。
from __future__ import print_function
import boto3
import datetime
import urllib
import urllib3
import logging
from pprint import pprint
import csv
import io
import json
from collections import defaultdict
from requests_aws4auth import AWS4Auth
import requests
globalVars = {}
globalVars['Owner'] = "cyberdemo"
globalVars['Environment'] = "Prod"
globalVars['awsRegion'] = "us-east-2"
globalVars['tagName'] = "serverless-s3-to-es-log-ingester"
globalVars['service'] = "es"
globalVars['esIndexPrefix'] = "es-logs-"
globalVars['esIndexDocType'] = "es_docs"
globalVars['esHosts'] = {
'test': '' ,
'prod': 'https://search-cyberlabs-xxxxxxxxxxx.us-east-2.es.amazonaws.com'
}
s3 = boto3.client('s3')
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth( credentials.access_key,
credentials.secret_key,
globalVars['awsRegion'],
globalVars['service'],
session_token=credentials.token
)
obj = s3.get_object(Bucket = 'bucket-name', Key = 'logs/conn_log.csv')
lines = obj['Body'].read().decode("utf-8").replace("'", '"')
indexName = globalVars['esIndexPrefix'] + str( datetime.date.today().year ) + '-' + str( datetime.date.today().month )
es_Url = globalVars['esHosts'].get('prod') + '/' + indexName + '/' + globalVars['esIndexDocType']
lines = lines.splitlines()
if (isinstance(lines, str)):
lines = [lines]
docData = {}
docData['objectKey'] = str(key)
docData['createdDate'] = str(obj['LastModified'])
docData['content_type'] = str(obj['ContentType'])
docData['content_length'] = str(obj['ContentLength'])
for line in lines:
docData['content'] = str(line)
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
resp = requests.post(es_Url, auth=awsauth, headers=headers, json=docData)
打印docData后,我得到以下输出。如您所见,“内容”包括单行数据。
{'objectKey': 'logs/conn_log.csv', 'createdDate': '2020-05-18 06:16:59+00:00', 'content_type': 'text/csv', 'content_length': '3888', 'content': '16/03/2012 20:30:00,Cozi0S1MAcO3HMgufa,192.168.202.79,17/04/2026 00:00:00,192.168.229.254,18/03/1901 00:00:00,tcp,ssl,00/01/1900 00:14:24,1/7/1901 0:00,25/11/1902 00:00:00,SF,-,00/01/1900 00:00:00,ShADadfFr,8/1/1900 0:00,29/08/1902 00:00:00,13/01/1900 00:00:00,9/10/1904 0:00'}
相反,我想按如下所示拆分内容
{'\ufeffts': '16/03/2012 20:30:00', 'uid': 'Cozi0S1MAcO3HMgufa', 'id.orig_h': '192.168.202.79', 'id.orig_p': '17/04/2026 00:00:00', 'id.resp_h': '192.168.229.254', 'id.resp_p': '18/03/1901 00:00:00', 'proto': 'tcp', 'service': 'ssl', 'duration': '00/01/1900 00:14:24', 'orig_bytes': '1/7/1901 0:00', 'resp_bytes': '25/11/1902 00:00:00', 'conn_state': 'SF', 'local_orig': '-', 'missed_bytes': '00/01/1900 00:00:00', 'history': 'ShADadfFr', 'orig_pkts': '8/1/1900 0:00', 'orig_ip_bytes': '29/08/1902 00:00:00', 'resp_pkts': '13/01/1900 00:00:00', 'resp_ip_bytes': '9/10/1904 0:00', 'objectKey': 'logs/conn_log.csv', 'createdDate': '2020-05-18 06:16:59+00:00', 'content_type': 'text/csv', 'content_length': '3888'}
因此,我从分割线部分修改了上面的代码。使用以下代码发布请求后,我得到的状态码为400。
buf = io.StringIO(lines)
reader = csv.DictReader(buf)
for line in reader:
line['objectKey'] = str(key)
line['createdDate'] = str(obj['LastModified'])
line['content_type'] = str(obj['ContentType'])
line['content_length'] = str(obj['ContentLength'])
Data = json.dumps(line)
docData = Data.replace('"', "'")
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
resp = requests.post(es_Url, auth=awsauth, headers=headers, json=docData)
print(resp)
我需要获取状态代码为201的修改后的代码。谁能帮忙?谢谢