使用aws cli连接eks
使用aws cli连接eks
下载命令行:
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
解压:
unzip awscliv2.zip
安装:
./aws/install
配置access_key和access_secret:
]# aws configure
配置文件:
]# cd .aws/ ]# cat config [default] region = us-west-1 ]# cat credentials [default] aws_access_key_id = xxxxxxxxx aws_secret_access_key = xxxxxxxxxxxxxxxxx
更新kubeconfig配置:
aws eks update-kubeconfig --name <cluster-name>
查看资源:
]# kubectl get pods -A NAMESPACE NAME READY STATUS RESTARTS AGE kube-system aws-node-j8gpf 1/1 Running 0 7h23m kube-system aws-node-pm9dd 1/1 Running 0 7h23m kube-system coredns-db9fb9979-7jmhl 1/1 Running 0 7h21m kube-system coredns-db9fb9979-xq6zn 1/1 Running 0 7h21m kube-system kube-proxy-2rnhl 1/1 Running 0 7h23m kube-system kube-proxy-xzpbm 1/1 Running 0 7h23m
实例:
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" apt install unzip unzip awscliv2.zip sudo ./aws/install /usr/local/bin/aws --version aws configure aws --version aws s3 ls aws eks update-kubeconfig --region us-west-2 --name eks-dev vim .kube/config kubectl get nodes
eks上安装aws原生的ingressController
安装eksctl:
curl --silent --location "https://github.com/weaveworks/eksctl/releases/latest/download/eksctl_$(uname -s)_amd64.tar.gz" | tar xz -C /tmp sudo mv /tmp/eksctl /usr/local/bin eksctl version
搞OIDC:
oidc_id=$(aws eks describe-cluster --name mexico --query "cluster.identity.oidc.issuer" --output text | cut -d '/' -f 5) aws iam list-open-id-connect-providers | grep $oidc_id | cut -d "/" -f4 eksctl utils associate-iam-oidc-provider --cluster mexico --approve
策略:
]# aws iam create-policy --policy-name AWSLoadBalancerControllerIAMPolicy --policy-document file://iam_policy.json
{
"Policy": {
"PolicyName": "AWSLoadBalancerControllerIAMPolicy",
"PolicyId": "ANPAQWDHG3XXXXXXXXXXXXXX",
"Arn": "arn:aws:iam::011111111111:policy/AWSLoadBalancerControllerIAMPolicy",
"Path": "/",
"DefaultVersionId": "v1",
"AttachmentCount": 0,
"PermissionsBoundaryUsageCount": 0,
"IsAttachable": true,
"CreateDate": "2023-03-13T07:22:07+00:00",
"UpdateDate": "2023-03-13T07:22:07+00:00"
}
}创建serviceaccount:
]# eksctl create iamserviceaccount \
> --cluster=mexico \
> --namespace=kube-system \
> --name=aws-load-balancer-controller \
> --role-name AmazonEKSLoadBalancerControllerRole \
> --attach-policy-arn=arn:aws:iam::01111111111:policy/AWSLoadBalancerControllerIAMPolicy \
> --approve
2023-03-13 15:28:47 [ℹ] 1 iamserviceaccount (kube-system/aws-load-balancer-controller) was included (based on the include/exclude rules)
2023-03-13 15:28:47 [!] serviceaccounts that exist in Kubernetes will be excluded, use --override-existing-serviceaccounts to override
2023-03-13 15:28:47 [ℹ] 1 task: {
2 sequential sub-tasks: {
create IAM role for serviceaccount "kube-system/aws-load-balancer-controller",
create serviceaccount "kube-system/aws-load-balancer-controller",
} }2023-03-13 15:28:47 [ℹ] building iamserviceaccount stack "eksctl-mexico-addon-iamserviceaccount-kube-system-aws-load-balancer-controller"
2023-03-13 15:28:47 [ℹ] deploying stack "eksctl-mexico-addon-iamserviceaccount-kube-system-aws-load-balancer-controller"
2023-03-13 15:28:48 [ℹ] waiting for CloudFormation stack "eksctl-mexico-addon-iamserviceaccount-kube-system-aws-load-balancer-controller"
2023-03-13 15:29:19 [ℹ] waiting for CloudFormation stack "eksctl-mexico-addon-iamserviceaccount-kube-system-aws-load-balancer-controller"
2023-03-13 15:29:20 [ℹ] created serviceaccount "kube-system/aws-load-balancer-controller"用helm安装:
helm repo add eks https://aws.github.io/eks-charts helm repo update
安装ingressController
]# helm install aws-load-balancer-controller eks/aws-load-balancer-controller \ > -n kube-system \ > --set clusterName=mexico \ > --set serviceAccount.create=false \ > --set serviceAccount.name=aws-load-balancer-controller WARNING: Kubernetes configuration file is group-readable. This is insecure. Location: /root/.kube/config WARNING: Kubernetes configuration file is world-readable. This is insecure. Location: /root/.kube/config NAME: aws-load-balancer-controller LAST DEPLOYED: Mon Mar 13 15:41:36 2023 NAMESPACE: kube-system STATUS: deployed REVISION: 1 TEST SUITE: None NOTES: AWS Load Balancer controller installed!
]# kubectl get ingressclass
NAME CONTROLLER PARAMETERS AGE alb ingress.k8s.aws/alb <none> 49s
查看资源:
]# kubectl get pods -n kube-system -l app.kubernetes.io/instance=aws-load-balancer-controller NAME READY STATUS RESTARTS AGE aws-load-balancer-controller-797bbbd7fc-jgwtq 1/1 Running 0 2m9s aws-load-balancer-controller-797bbbd7fc-njvpk 1/1 Running 0 2m9s
aws ingress配置:
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
annotations:
alb.ingress.kubernetes.io/certificate-arn: arn:aws:acm:us-west-1:011111111:certificate/xxxxxxx-c1ae-xxxxxxxx-xxxxxxxx
alb.ingress.kubernetes.io/listen-ports: '[{"HTTPS":443}, {"HTTP":80}]'
alb.ingress.kubernetes.io/scheme: internet-facing
alb.ingress.kubernetes.io/ssl-redirect: "443"
# 需要配置子网subnet或在子网中加上标签,否者不会自动创建alb,详见 docs.aws.amazon.com/eks/latest/userguide/alb-ingress.html
alb.ingress.kubernetes.io/subnets: subnet-05008fbb31930c7c0, subnet-071e07e83dd5146f5
alb.ingress.kubernetes.io/target-type: ip
name: admin-api
namespace: prd
spec:
ingressClassName: alb
rules:
- host: api.scriptjc.com
http:
paths:
- backend:
service:
name: api
port:
number: 80
path: /
pathType: Prefix参考文档:
docs.aws.amazon.com/zh_cn/eks/latest/userguide/aws-load-balancer-controller.html docs.aws.amazon.com/zh_cn/eks/latest/userguide/eksctl.html # ingress annotations 的配置 kubernetes-sigs.github.io/aws-load-balancer-controller/v2.2/guide/ingress/annotations/#annotations
aws上安装prometheus监控:包括创建iam角色,gp2的csi插件等。
dev.to/aws-builders/monitoring-eks-cluster-with-prometheus-and-grafana-1kpb
aws 升级kubectl
使用了新版本的kubectl报错如下:报错显示需要更新kubeconfig文件。
]# kubectl get pods -A Unable to connect to the server: getting credentials: exec plugin is configured to use API version client.authentication.k8s.io/v1beta1, plugin returned version client.authentication.k8s.io/v1alpha1
先跟新awscli工具:
]# pip3 install --upgrade awscli
在此使用命令,发现已经没有报错了,不过还需要跟新kubeconfig 文件。
]# kubectl get pods Kubeconfig user entry is using deprecated API version client.authentication.k8s.io/v1alpha1. Run 'aws eks update-kubeconfig' to update.
更新 kubeconfig 文件:
]# aws eks update-kubeconfig --region us-west-2 --name eks-dev --region us-west-2 # 区域名称可在aws中查看 --name eks-dev # 集群名称就是创建集群时的eks集群名称
再次使用命令,此时就没有任何提示和报错了
]# kubectl get pods No resources found in default namespace.
升级helm:
curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 chmod 700 get_helm.sh ./get_helm.sh
aws中eks创建storageclass
一、安装aws-ebs-csi-driver:
1、安装eksctl:
curl --silent --location "https://github.com/weaveworks/eksctl/releases/latest/download/eksctl_$(uname -s)_amd64.tar.gz" | tar xz -C /tmp mv /tmp/eksctl /usr/bin eksctl version
2、创建associate-iam-oidc-provider:
~]# eksctl utils associate-iam-oidc-provider \ --region us-west-2 \ --cluster eks-prod \ --approve
验证结果:
~]# aws eks describe-cluster --name eks-prod --query "cluster.identity.oidc.issuer" --output text https://oidc.eks.us-west-2.amazonaws.com/id/xxxxxxxxxxxxxxxxx
3、创建 iamserviceaccount
~]# eksctl create iamserviceaccount \ --name ebs-csi-controller-sa \ --namespace kube-system \ --cluster eks-prod \ --role-name AmazonEKS_EBS_CSI_DriverRole \ --role-only \ --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \ --approve
4、安装addon:安装 aws-ebs-csi-driver
~]# eksctl create addon \ --name aws-ebs-csi-driver \ --cluster eks-prod \ --service-account-role-arn arn:aws:iam::123123123123:role/AmazonEKS_EBS_CSI_DriverRole \ --force
二、创建storageclass:
~]# cat storageclass.yaml apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: ebs3 annotations: storageclass.kubernetes.io/is-default-class: "true" provisioner: ebs.csi.aws.com volumeBindingMode: WaitForFirstConsumer parameters: type: gp3
创建pvc:
~]# cat demo-pvc.yaml apiVersion: v1 kind: PersistentVolumeClaim metadata: name: demo-pvc spec: storageClassName: ebs3 accessModes: [ReadWriteOnce] resources: requests: storage: 1G
创建Pod:
~]# cat demo-prod.yaml apiVersion: v1 kind: Pod metadata: name: demo-pod spec: containers: - name: app image: nginx volumeMounts: - name: data mountPath: /usr/share/nginx/html volumes: - name: data persistentVolumeClaim: claimName: demo-pvc
导出cloudwatch监控数据到prometheus
项目地址:
这个项目可以将aws中服务的监控信息导出,然后使用prometheus去抓取exporter里的数据即可。如果只是想看数据的话则不必部署 cloudwatch_exporter,可以在prometheus中的data source里将cloudwatch作为数据源,需要填写AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY 即可,然后导入grafana面板即可浏览监控数据。
github.com/prometheus/cloudwatch_exporter
配置文件:
cat > secret.yaml <<EOF apiVersion: v1 kind: Secret metadata: name: cloudwatch-exporter-secret namespace: cloudwatch type: Opaque data: AWS_ACCESS_KEY_ID: QAtJxxxxxxxxxxxxxxgK # base64 AWS_SECRET_ACCESS_KEY: PktoNE1ZalpLexxxxxxxxxxxxxxzdUZzJXbk5YV2FXYQo= # base64 EOF # 这个配置在项目的sample目录里有 cat > configmap.yaml <<EOF apiVersion: v1 kind: ConfigMap metadata: name: cloudwatch-config namespace: cloudwatch data: config.yml: | region: us-west-2 metrics: - aws_namespace: AWS/ApplicationELB aws_metric_name: HealthyHostCount aws_dimensions: - LoadBalancer - TargetGroup aws_statistics: - Minimum aws_tag_select: resource_type_selection: elasticloadbalancing:targetgroup resource_id_dimension: TargetGroup arn_resource_id_regexp: "(targetgroup/.*)$" tag_selections: Environment: - production - aws_namespace: AWS/ApplicationELB aws_metric_name: UnHealthyHostCount aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: RequestCount aws_dimensions: - LoadBalancer aws_statistics: - Average - aws_namespace: AWS/ApplicationELB aws_metric_name: TargetResponseTime aws_dimensions: - LoadBalancer aws_statistics: - Average - aws_namespace: AWS/ApplicationELB aws_metric_name: ActiveConnectionCount aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: NewConnectionCount aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: RejectedConnectionCount aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: TargetConnectionErrorCount aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: RequestCount aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: IPv6RequestCount aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: RequestCountPerTarget aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: NonStickyRequestCount aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: HTTPCode_Target_2XX_Count aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: HTTPCode_Target_3XX_Count aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: HTTPCode_Target_4XX_Count aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: HTTPCode_Target_5XX_Count aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: HTTPCode_ELB_3XX_Count aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: HTTPCode_ELB_4XX_Count aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: HTTPCode_ELB_5XX_Count aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: ProcessedBytes aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: IPv6ProcessedBytes aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: ConsumedLCUs delay_seconds: 1800 aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: ClientTLSNegotiationErrorCount aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: TargetTLSNegotiationErrorCount aws_dimensions: - LoadBalancer aws_statistics: - Sum - aws_namespace: AWS/ApplicationELB aws_metric_name: RuleEvaluations aws_dimensions: - LoadBalancer aws_statistics: - Sum EOF cat > cloudwatch-exporter.yaml <<EOF apiVersion: apps/v1 kind: Deployment metadata: name: cloudwatch-exporter namespace: cloudwatch spec: replicas: 1 selector: matchLabels: app: cloudwatch-exporter template: metadata: annotations: prometheus.io/path: /metrics prometheus.io/port: "9106" prometheus.io/scheme: http prometheus.io/scrape: "true" labels: app: cloudwatch-exporter spec: containers: - name: cloudwatch-exporter image: quay.io/prometheus/cloudwatch-exporter:latest env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: cloudwatch-exporter-secret key: AWS_ACCESS_KEY_ID - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: cloudwatch-exporter-secret key: AWS_SECRET_ACCESS_KEY ports: - containerPort: 9106 resources: requests: cpu: 100m memory: 100Mi limits: cpu: 2 memory: 2Gi volumeMounts: - name: config-volume mountPath: /config volumes: - name: config-volume configMap: name: cloudwatch-config EOF
将本地文件写入到s3
import boto3
from botocore.exceptions import ClientError
import logging
import os
import argparse
import sys
# 配置日志记录,便于排查问题
logging.basicConfig(level=logging.INFO)
def get_content_type_and_encoding(file_name):
"""
根据文件扩展名返回相应的Content-Type和Content-Encoding
参数:
file_name: 文件名
返回:
tuple: (content_type, content_encoding) 或 (content_type, None)
"""
if file_name.endswith('.js'):
return 'application/javascript', None
elif file_name.endswith('.wasm.gz'):
return 'application/wasm', 'gzip'
elif file_name.endswith('.data.gz'):
return 'application/octet-stream', 'gzip'
elif file_name.endswith('.js.gz'):
return 'application/javascript', 'gzip'
else:
# 默认类型
return 'application/octet-stream', None
def upload_file_to_s3(file_name, bucket, object_name=None):
"""
将本地文件上传到指定的S3存储桶,并配置元数据和权限
参数:
file_name: 要上传的本地文件路径
bucket: 目标S3存储桶名称
object_name: S3中的对象名称(可选,默认为本地文件名)
"""
# 如果未指定S3对象名称,则使用本地文件名
if object_name is None:
object_name = file_name
# 根据文件扩展名获取Content-Type和Content-Encoding
content_type, content_encoding = get_content_type_and_encoding(file_name)
# 构建ExtraArgs参数
extra_args = {
'ContentType': content_type,
'ACL': 'public-read',
}
# 如果有Content-Encoding,则添加
if content_encoding:
extra_args['ContentEncoding'] = content_encoding
# 创建S3客户端
s3_client = boto3.client(
's3',
)
try:
# 使用upload_file方法上传文件,并通过ExtraArgs参数配置元数据和ACL
s3_client.upload_file(
Filename=file_name,
Bucket=bucket,
Key=object_name,
ExtraArgs=extra_args
)
logging.info(f"文件 {file_name} 已成功上传到存储桶 {bucket} 作为 {object_name},Content-Type: {content_type}" +
(f", Content-Encoding: {content_encoding}" if content_encoding else ""))
return True
except ClientError as e:
logging.error(f"文件上传失败: {e}")
return False
except FileNotFoundError:
logging.error(f"本地文件 {file_name} 未找到。")
return False
def upload_files_from_directory(directory_path, bucket, prefix_and_version):
"""
批量上传指定目录中的所有文件到S3
参数:
directory_path: 本地目录路径
bucket: 目标S3存储桶名称
prefix_and_version: 前缀+版本
返回:
tuple: (成功上传数量, 总文件数量)
"""
if not os.path.exists(directory_path):
logging.error(f"目录 {directory_path} 不存在")
return 0, 0
files = os.listdir(directory_path)
total_files = len(files)
success_count = 0
logging.info(f"开始上传 {total_files} 个文件到S3...")
for file_name in files:
local_file_path = os.path.join(directory_path, file_name)
# 跳过目录
if os.path.isdir(local_file_path):
continue
# 构建S3对象键
s3_object_key = f'{prefix_and_version}/{file_name}'
# 上传文件
if upload_file_to_s3(local_file_path, bucket, s3_object_key):
success_count += 1
else:
logging.error(f"文件 {file_name} 上传失败")
logging.info(f"上传完成:成功 {success_count}/{total_files} 个文件")
return success_count, total_files
# 使用示例
if __name__ == "__main__":
# 创建命令行参数解析器
parser = argparse.ArgumentParser(description='上传文件到S3存储桶')
parser.add_argument('prefix_and_version', help='游戏版本路径前缀 (例如: fruit-archery/next/0.1.1)')
parser.add_argument('--files-dir', default='files', help='本地文件目录路径 (默认: files)')
parser.add_argument('--bucket', default='toggee-build', help='S3存储桶名称 (默认: toggee-build)')
# 解析命令行参数
args = parser.parse_args()
# 配置参数
prefix_and_version = args.prefix_and_version
local_files_directory = args.files_dir
s3_bucket_name = args.bucket
print(f"开始上传文件...")
print(f"前缀+版本: {prefix_and_version}")
print(f"本地文件目录: {local_files_directory}")
print(f"S3存储桶: {s3_bucket_name}")
print("-" * 50)
# 批量上传files目录中的所有文件
success_count, total_count = upload_files_from_directory(local_files_directory, s3_bucket_name, prefix_and_version)
if success_count == total_count and total_count > 0:
print(f"所有文件上传成功!({success_count}/{total_count})")
sys.exit(0)
elif success_count > 0:
print(f"部分文件上传成功:{success_count}/{total_count}")
sys.exit(1)
else:
print("上传失败,请检查日志和配置。")
sys.exit(1)将cloudfront日志写入到本地ES
import boto3
from elasticsearch import Elasticsearch, helpers
from datetime import datetime, timedelta, timezone
import time
import os
import json
import urllib.parse as urlparse
# AWS 凭证配置方法
# 方法1: 从环境变量读取(推荐)
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_REGION = os.getenv('AWS_DEFAULT_REGION', 'us-east-1')
# 方法2: 直接在代码中设置(仅用于测试,生产环境不推荐)
# AWS_ACCESS_KEY_ID = 'YOUR_ACCESS_KEY_ID'
# AWS_SECRET_ACCESS_KEY = 'YOUR_SECRET_ACCESS_KEY'
# AWS_REGION = 'us-east-1'
# 运行参数(可通过环境变量覆盖)
LOG_GROUP_NAME = os.getenv('CWL_LOG_GROUP', 'toggee-build')
ES_URL = os.getenv('ES_URL', 'http://172.16.37.126:9200')
ES_INDEX_PREFIX = os.getenv('ES_INDEX_PREFIX', 'cloudwatch-cloudfront-logs')
POLL_INTERVAL_SECONDS = int(os.getenv('POLL_INTERVAL_SECONDS', '10'))
BACKFILL_MINUTES = int(os.getenv('BACKFILL_MINUTES', '10'))
CHECKPOINT_PATH = os.getenv('CHECKPOINT_PATH', './cwl_to_es_checkpoint.json')
# 配置 AWS CloudWatch Logs 客户端
cloudwatch_logs = boto3.client(
'logs',
region_name=AWS_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)
# 配置 Elasticsearch 客户端
es = Elasticsearch([ES_URL])
def parse_cloudfront_message(message_str, cw_event_ts_ms=None):
"""
解析 CloudFront 风格的 JSON 字符串,标准化字段值并生成 ES 时间戳。
返回: (parsed_dict, es_timestamp_iso)
"""
if not isinstance(message_str, str):
return None, None
try:
data = json.loads(message_str)
except Exception:
return None, None
if not isinstance(data, dict):
return None, None
parsed = {}
decode_keys = {
"cs(Referer)",
"cs(User-Agent)",
"cs-uri-stem",
"cs-uri-query",
"cs(Host)"
}
for key, value in data.items():
# 统一将 '-' 视为 None
if isinstance(value, str) and value == '-':
parsed[key] = None
continue
# 可能存在百分号编码,按 key 白名单解码
if isinstance(value, str) and key in decode_keys:
try:
value = urlparse.unquote_plus(value)
except Exception:
pass
# 将数字字符串转换为数字类型
if isinstance(value, str):
# 尝试 int
try:
if value.isdigit() or (value.startswith('-') and value[1:].isdigit()):
parsed[key] = int(value)
continue
except Exception:
pass
# 尝试 float
try:
parsed[key] = float(value)
continue
except Exception:
parsed[key] = value
else:
parsed[key] = value
# 生成 ES 使用的 @timestamp (ISO8601, UTC)
ts_ms = None
val_ms = parsed.get('timestamp(ms)')
if isinstance(val_ms, (int, float)):
ts_ms = int(val_ms)
if ts_ms is None:
val_s = parsed.get('timestamp')
if isinstance(val_s, (int, float)):
ts_ms = int(float(val_s) * 1000)
# 由 date + time 推断
if ts_ms is None and isinstance(parsed.get('date'), str) and isinstance(parsed.get('time'), str):
try:
dt = datetime.strptime(f"{parsed['date']} {parsed['time']}", "%Y-%m-%d %H:%M:%S")
ts_ms = int(dt.replace(tzinfo=timezone.utc).timestamp() * 1000)
except Exception:
pass
if ts_ms is None and cw_event_ts_ms is not None:
ts_ms = int(cw_event_ts_ms)
es_ts_iso = None
if ts_ms is not None:
es_ts_iso = datetime.fromtimestamp(ts_ms / 1000.0, tz=timezone.utc).isoformat()
return parsed, es_ts_iso
def get_log_events(log_group_name, start_time, end_time):
"""
从指定 CloudWatch 日志组获取特定时间范围内的日志事件。
参数:
log_group_name: CloudWatch 日志组名称
start_time: 开始时间 (datetime 对象或时间戳)
end_time: 结束时间 (datetime 对象或时间戳)
返回:
日志事件列表
"""
# 确保时间为毫秒时间戳
if isinstance(start_time, datetime):
start_time = int(start_time.timestamp() * 1000)
if isinstance(end_time, datetime):
end_time = int(end_time.timestamp() * 1000)
log_events = []
next_token = None
try:
while True:
params = {
'logGroupName': log_group_name,
'startTime': start_time,
'endTime': end_time,
}
if next_token:
params['nextToken'] = next_token
response = cloudwatch_logs.filter_log_events(**params)
events = response.get('events', [])
log_events.extend(events)
next_token = response.get('nextToken')
if not next_token:
break
except Exception as e:
print(f"获取日志时出错: {e}")
return log_events
def prepare_actions_for_es(log_events, index_name):
"""
将 CloudWatch 日志事件格式化为 Elasticsearch 批量操作。
参数:
log_events: 从 get_log_events 获取的日志事件列表
index_name: Elasticsearch 索引名称
返回:
用于 helpers.bulk 的操作列表
"""
actions = []
for event in log_events:
raw_message = event.get('message')
parsed, es_ts_iso = parse_cloudfront_message(raw_message, cw_event_ts_ms=event.get('timestamp'))
source = {
# 原始 CloudWatch 元信息
'cloudwatch_timestamp_ms': event.get('timestamp'),
'cloudwatch_logStreamName': event.get('logStreamName'),
'cloudwatch_ingestionTime': event.get('ingestionTime'),
'cloudwatch_eventId': event.get('eventId'),
'message_raw': raw_message,
}
# 合并解析后的字段至顶层,避免深层嵌套,便于检索
if isinstance(parsed, dict):
for k, v in parsed.items():
source[k] = v
# 规范化 @timestamp
source['@timestamp'] = es_ts_iso or datetime.now(timezone.utc).isoformat()
doc = {
'_op_type': 'index',
'_index': index_name,
'_source': source
}
actions.append(doc)
return actions
def load_checkpoint(path):
try:
if not os.path.exists(path):
return None
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 返回毫秒级时间戳
return int(data.get('last_timestamp_ms'))
except Exception as e:
print(f"读取检查点失败: {e}")
return None
def save_checkpoint(path, last_ts_ms):
try:
tmp_path = f"{path}.tmp"
with open(tmp_path, 'w', encoding='utf-8') as f:
json.dump({'last_timestamp_ms': int(last_ts_ms)}, f)
os.replace(tmp_path, path)
except Exception as e:
print(f"保存检查点失败: {e}")
def index_name_for_datetime(prefix, dt):
return f"{prefix}-{dt.strftime('%Y.%m.%d')}"
def continuous_sync():
# 初始化起始时间:检查点或回溯窗口
checkpoint_ms = load_checkpoint(CHECKPOINT_PATH)
if checkpoint_ms is None:
start_dt = datetime.now(timezone.utc) - timedelta(minutes=BACKFILL_MINUTES)
start_ms = int(start_dt.timestamp() * 1000)
else:
start_ms = checkpoint_ms + 1
print(f"开始持续同步。日志组='{LOG_GROUP_NAME}', ES='{ES_URL}', 索引前缀='{ES_INDEX_PREFIX}'")
print(f"初始起点(ms): {start_ms}")
try:
while True:
end_dt = datetime.now(timezone.utc)
end_ms = int(end_dt.timestamp() * 1000)
# 读取窗口
events = get_log_events(LOG_GROUP_NAME, start_ms, end_ms)
if events:
# CloudWatch 可能跨天,按事件时间分桶写入对应日期索引
events_sorted = sorted(events, key=lambda e: (e.get('timestamp') or 0, e.get('eventId') or ''))
# 批量按天写入
batch_actions = []
for ev in events_sorted:
ev_ts = ev.get('timestamp') or end_ms
ev_dt = datetime.fromtimestamp(ev_ts / 1000.0, tz=timezone.utc)
idx = index_name_for_datetime(ES_INDEX_PREFIX, ev_dt)
batch_actions.extend(prepare_actions_for_es([ev], idx))
try:
success, failed = helpers.bulk(es, batch_actions, stats_only=True)
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 写入 ES 成功: {success} 条;时间段 [{start_ms}, {end_ms}];事件数 {len(events_sorted)}")
except Exception as es_err:
print(f"批量写入 ES 出错: {es_err}")
# 更新检查点为最后一条事件的时间戳
last_event_ts = events_sorted[-1].get('timestamp') or end_ms
save_checkpoint(CHECKPOINT_PATH, last_event_ts)
start_ms = last_event_ts + 1
else:
# 无新数据,保持窗口不回退
pass
time.sleep(POLL_INTERVAL_SECONDS)
except KeyboardInterrupt:
print("收到中断信号,安全退出。")
except Exception as e:
print(f"持续同步出错: {e}")
def main():
# 持续同步模式
print("启动持续日志同步模式...")
print(f"配置: 日志组={LOG_GROUP_NAME}, ES={ES_URL}, 前缀={ES_INDEX_PREFIX}, 轮询={POLL_INTERVAL_SECONDS}s, 回溯={BACKFILL_MINUTES}min, 检查点={CHECKPOINT_PATH}")
continuous_sync()
if __name__ == "__main__":
main()