无法向AWS Elasticsearch Service发布请求

当我尝试使用以下代码将数据从S3存储桶加载到AWS ES时,我可以获取请求发布状态代码为201。

from __future__ import print_function
import boto3
import datetime
import urllib
import urllib3
import logging
from pprint import pprint
import csv
import io
import json
from collections import defaultdict
from requests_aws4auth import AWS4Auth
import requests 

globalVars  = {}
globalVars['Owner']                 = "cyberdemo"
globalVars['Environment']           = "Prod"
globalVars['awsRegion']             = "us-east-2"
globalVars['tagName']               = "serverless-s3-to-es-log-ingester"
globalVars['service']               = "es"
globalVars['esIndexPrefix']         = "es-logs-"
globalVars['esIndexDocType']        = "es_docs"
globalVars['esHosts']               = {
                                        'test': '' ,
                                        'prod': 'https://search-cyberlabs-xxxxxxxxxxx.us-east-2.es.amazonaws.com'
                                        }

s3 = boto3.client('s3')
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth( credentials.access_key, 
                        credentials.secret_key, 
                        globalVars['awsRegion'], 
                        globalVars['service'], 
                        session_token=credentials.token
                    )
obj = s3.get_object(Bucket = 'bucket-name', Key = 'logs/conn_log.csv')
lines = obj['Body'].read().decode("utf-8").replace("'", '"')

indexName = globalVars['esIndexPrefix'] + str( datetime.date.today().year ) + '-' + str( datetime.date.today().month )
es_Url = globalVars['esHosts'].get('prod') + '/' + indexName + '/' + globalVars['esIndexDocType']

lines = lines.splitlines()
if (isinstance(lines, str)):
        lines = [lines]
docData = {}
docData['objectKey']        = str(key)
docData['createdDate']      = str(obj['LastModified'])
docData['content_type']     = str(obj['ContentType'])
docData['content_length']   = str(obj['ContentLength'])

for line in lines:
        docData['content'] = str(line)

headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
resp = requests.post(es_Url, auth=awsauth, headers=headers, json=docData)

打印docData后,我得到以下输出。如您所见,“内容”包括单行数据。

{'objectKey': 'logs/conn_log.csv', 'createdDate': '2020-05-18 06:16:59+00:00', 'content_type': 'text/csv', 'content_length': '3888', 'content': '16/03/2012 20:30:00,Cozi0S1MAcO3HMgufa,192.168.202.79,17/04/2026 00:00:00,192.168.229.254,18/03/1901 00:00:00,tcp,ssl,00/01/1900 00:14:24,1/7/1901 0:00,25/11/1902 00:00:00,SF,-,00/01/1900 00:00:00,ShADadfFr,8/1/1900 0:00,29/08/1902 00:00:00,13/01/1900 00:00:00,9/10/1904 0:00'}

相反,我想按如下所示拆分内容

{'\ufeffts': '16/03/2012 20:30:00', 'uid': 'Cozi0S1MAcO3HMgufa', 'id.orig_h': '192.168.202.79', 'id.orig_p': '17/04/2026 00:00:00', 'id.resp_h': '192.168.229.254', 'id.resp_p': '18/03/1901 00:00:00', 'proto': 'tcp', 'service': 'ssl', 'duration': '00/01/1900 00:14:24', 'orig_bytes': '1/7/1901 0:00', 'resp_bytes': '25/11/1902 00:00:00', 'conn_state': 'SF', 'local_orig': '-', 'missed_bytes': '00/01/1900 00:00:00', 'history': 'ShADadfFr', 'orig_pkts': '8/1/1900 0:00', 'orig_ip_bytes': '29/08/1902 00:00:00', 'resp_pkts': '13/01/1900 00:00:00', 'resp_ip_bytes': '9/10/1904 0:00', 'objectKey': 'logs/conn_log.csv', 'createdDate': '2020-05-18 06:16:59+00:00', 'content_type': 'text/csv', 'content_length': '3888'}

因此,我从分割线部分修改了上面的代码。使用以下代码发布请求后,我得到的状态码为400。

buf = io.StringIO(lines)
reader = csv.DictReader(buf)
for line in reader:
    line['objectKey']        = str(key)
    line['createdDate']      = str(obj['LastModified'])
    line['content_type']     = str(obj['ContentType'])
    line['content_length']   = str(obj['ContentLength'])
    Data = json.dumps(line)
    docData = Data.replace('"', "'")

headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
resp = requests.post(es_Url, auth=awsauth, headers=headers, json=docData)

print(resp)

我需要获取状态代码为201的修改后的代码。谁能帮忙?谢谢

评论