使用aws cli连接eks


下载命令行:

curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"

解压:

unzip awscliv2.zip

安装:

./aws/install

配置access_key和access_secret:

]# aws configure

配置文件:

]# cd .aws/
]# cat config
[default]
region = us-west-1
]# cat credentials 
[default]
aws_access_key_id = xxxxxxxxx
aws_secret_access_key = xxxxxxxxxxxxxxxxx

更新kubeconfig配置:

aws eks update-kubeconfig --name <cluster-name>

查看资源:

]# kubectl get pods -A
NAMESPACE     NAME                      READY   STATUS    RESTARTS   AGE
kube-system   aws-node-j8gpf            1/1     Running   0          7h23m
kube-system   aws-node-pm9dd            1/1     Running   0          7h23m
kube-system   coredns-db9fb9979-7jmhl   1/1     Running   0          7h21m
kube-system   coredns-db9fb9979-xq6zn   1/1     Running   0          7h21m
kube-system   kube-proxy-2rnhl          1/1     Running   0          7h23m
kube-system   kube-proxy-xzpbm          1/1     Running   0          7h23m

实例:

curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
apt install unzip
unzip awscliv2.zip
sudo ./aws/install
/usr/local/bin/aws --version
aws configure
aws --version
aws s3 ls
aws eks update-kubeconfig --region us-west-2 --name eks-dev
vim .kube/config
kubectl get nodes



eks上安装aws原生的ingressController


安装eksctl:

curl --silent --location "https://github.com/weaveworks/eksctl/releases/latest/download/eksctl_$(uname -s)_amd64.tar.gz" | tar xz -C /tmp
sudo mv /tmp/eksctl /usr/local/bin
eksctl version

搞OIDC:

oidc_id=$(aws eks describe-cluster --name mexico --query "cluster.identity.oidc.issuer" --output text | cut -d '/' -f 5)
aws iam list-open-id-connect-providers | grep $oidc_id | cut -d "/" -f4
eksctl utils associate-iam-oidc-provider --cluster mexico --approve

策略:

]# aws iam create-policy --policy-name AWSLoadBalancerControllerIAMPolicy --policy-document file://iam_policy.json
{
    "Policy": {
        "PolicyName": "AWSLoadBalancerControllerIAMPolicy",
        "PolicyId": "ANPAQWDHG3XXXXXXXXXXXXXX",
        "Arn": "arn:aws:iam::011111111111:policy/AWSLoadBalancerControllerIAMPolicy",
        "Path": "/",
        "DefaultVersionId": "v1",
        "AttachmentCount": 0,
        "PermissionsBoundaryUsageCount": 0,
        "IsAttachable": true,
        "CreateDate": "2023-03-13T07:22:07+00:00",
        "UpdateDate": "2023-03-13T07:22:07+00:00"
    }
}

创建serviceaccount:

]# eksctl create iamserviceaccount \
> --cluster=mexico \
> --namespace=kube-system \
> --name=aws-load-balancer-controller \
> --role-name AmazonEKSLoadBalancerControllerRole \
> --attach-policy-arn=arn:aws:iam::01111111111:policy/AWSLoadBalancerControllerIAMPolicy \
> --approve
2023-03-13 15:28:47 [ℹ]  1 iamserviceaccount (kube-system/aws-load-balancer-controller) was included (based on the include/exclude rules)
2023-03-13 15:28:47 [!]  serviceaccounts that exist in Kubernetes will be excluded, use --override-existing-serviceaccounts to override
2023-03-13 15:28:47 [ℹ]  1 task: { 
    2 sequential sub-tasks: { 
        create IAM role for serviceaccount "kube-system/aws-load-balancer-controller",
        create serviceaccount "kube-system/aws-load-balancer-controller",
    } }2023-03-13 15:28:47 [ℹ]  building iamserviceaccount stack "eksctl-mexico-addon-iamserviceaccount-kube-system-aws-load-balancer-controller"
2023-03-13 15:28:47 [ℹ]  deploying stack "eksctl-mexico-addon-iamserviceaccount-kube-system-aws-load-balancer-controller"
2023-03-13 15:28:48 [ℹ]  waiting for CloudFormation stack "eksctl-mexico-addon-iamserviceaccount-kube-system-aws-load-balancer-controller"
2023-03-13 15:29:19 [ℹ]  waiting for CloudFormation stack "eksctl-mexico-addon-iamserviceaccount-kube-system-aws-load-balancer-controller"
2023-03-13 15:29:20 [ℹ]  created serviceaccount "kube-system/aws-load-balancer-controller"

用helm安装:

helm repo add eks https://aws.github.io/eks-charts
helm repo update

安装ingressController

]# helm install aws-load-balancer-controller eks/aws-load-balancer-controller \
>   -n kube-system \
>   --set clusterName=mexico \
>   --set serviceAccount.create=false \
>   --set serviceAccount.name=aws-load-balancer-controller
WARNING: Kubernetes configuration file is group-readable. This is insecure. Location: /root/.kube/config
WARNING: Kubernetes configuration file is world-readable. This is insecure. Location: /root/.kube/config
NAME: aws-load-balancer-controller
LAST DEPLOYED: Mon Mar 13 15:41:36 2023
NAMESPACE: kube-system
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
AWS Load Balancer controller installed!

]# kubectl get ingressclass

NAME    CONTROLLER             PARAMETERS   AGE
alb     ingress.k8s.aws/alb    <none>       49s

查看资源:

]# kubectl get pods -n kube-system -l app.kubernetes.io/instance=aws-load-balancer-controller
NAME                                            READY   STATUS    RESTARTS   AGE
aws-load-balancer-controller-797bbbd7fc-jgwtq   1/1     Running   0          2m9s
aws-load-balancer-controller-797bbbd7fc-njvpk   1/1     Running   0          2m9s

aws ingress配置:

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  annotations:
    alb.ingress.kubernetes.io/certificate-arn: arn:aws:acm:us-west-1:011111111:certificate/xxxxxxx-c1ae-xxxxxxxx-xxxxxxxx
    alb.ingress.kubernetes.io/listen-ports: '[{"HTTPS":443}, {"HTTP":80}]'
    alb.ingress.kubernetes.io/scheme: internet-facing
    alb.ingress.kubernetes.io/ssl-redirect: "443"
    # 需要配置子网subnet或在子网中加上标签,否者不会自动创建alb,详见 docs.aws.amazon.com/eks/latest/userguide/alb-ingress.html
    alb.ingress.kubernetes.io/subnets: subnet-05008fbb31930c7c0, subnet-071e07e83dd5146f5
    alb.ingress.kubernetes.io/target-type: ip
  name: admin-api
  namespace: prd
spec:
  ingressClassName: alb
  rules:
  - host: api.scriptjc.com
    http:
      paths:
      - backend:
          service:
            name: api
            port:
              number: 80
        path: /
        pathType: Prefix

参考文档:

docs.aws.amazon.com/zh_cn/eks/latest/userguide/aws-load-balancer-controller.html 
docs.aws.amazon.com/zh_cn/eks/latest/userguide/eksctl.html

# ingress annotations 的配置
kubernetes-sigs.github.io/aws-load-balancer-controller/v2.2/guide/ingress/annotations/#annotations

aws上安装prometheus监控:包括创建iam角色,gp2的csi插件等。

dev.to/aws-builders/monitoring-eks-cluster-with-prometheus-and-grafana-1kpb



aws 升级kubectl


使用了新版本的kubectl报错如下:报错显示需要更新kubeconfig文件。

]# kubectl get pods -A
Unable to connect to the server: getting credentials: exec plugin is configured to use API version client.authentication.k8s.io/v1beta1, plugin returned version client.authentication.k8s.io/v1alpha1

先跟新awscli工具:

]# pip3 install --upgrade awscli

在此使用命令,发现已经没有报错了,不过还需要跟新kubeconfig 文件。

]# kubectl get pods 
Kubeconfig user entry is using deprecated API version client.authentication.k8s.io/v1alpha1. Run 'aws eks update-kubeconfig' to update.

更新 kubeconfig 文件:

]# aws eks update-kubeconfig --region us-west-2 --name eks-dev

--region us-west-2 # 区域名称可在aws中查看
--name eks-dev # 集群名称就是创建集群时的eks集群名称

再次使用命令,此时就没有任何提示和报错了

]# kubectl get pods
No resources found in default namespace.

升级helm:

curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3
chmod 700 get_helm.sh
./get_helm.sh



aws中eks创建storageclass


一、安装aws-ebs-csi-driver:

1、安装eksctl:

curl --silent --location "https://github.com/weaveworks/eksctl/releases/latest/download/eksctl_$(uname -s)_amd64.tar.gz" | tar xz -C /tmp
mv /tmp/eksctl /usr/bin
eksctl version

2、创建associate-iam-oidc-provider:

~]# eksctl utils associate-iam-oidc-provider \
   --region us-west-2 \
   --cluster eks-prod \
   --approve

验证结果:

~]# aws eks describe-cluster --name eks-prod --query "cluster.identity.oidc.issuer" --output text
https://oidc.eks.us-west-2.amazonaws.com/id/xxxxxxxxxxxxxxxxx

3、创建 iamserviceaccount

~]# eksctl create iamserviceaccount \
   --name ebs-csi-controller-sa \
   --namespace kube-system \
   --cluster eks-prod \
   --role-name AmazonEKS_EBS_CSI_DriverRole \
   --role-only \
   --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
   --approve

4、安装addon:安装 aws-ebs-csi-driver

~]# eksctl create addon \
   --name aws-ebs-csi-driver \
   --cluster eks-prod \
   --service-account-role-arn arn:aws:iam::123123123123:role/AmazonEKS_EBS_CSI_DriverRole \
   --force

二、创建storageclass:

~]# cat storageclass.yaml 
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: ebs3
  annotations:
    storageclass.kubernetes.io/is-default-class: "true"
provisioner: ebs.csi.aws.com
volumeBindingMode: WaitForFirstConsumer
parameters:
  type: gp3

创建pvc:

~]# cat demo-pvc.yaml 
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: demo-pvc
spec:
  storageClassName: ebs3
  accessModes: [ReadWriteOnce]
  resources:
    requests:
      storage: 1G

创建Pod:

~]# cat demo-prod.yaml 
apiVersion: v1
kind: Pod
metadata:
  name: demo-pod
spec:
  containers:
  - name: app
    image: nginx
    volumeMounts:
    - name: data
      mountPath: /usr/share/nginx/html
  volumes:
  - name: data
    persistentVolumeClaim:
      claimName: demo-pvc



导出cloudwatch监控数据到prometheus


项目地址:

        这个项目可以将aws中服务的监控信息导出,然后使用prometheus去抓取exporter里的数据即可。如果只是想看数据的话则不必部署 cloudwatch_exporter,可以在prometheus中的data source里将cloudwatch作为数据源,需要填写AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY 即可,然后导入grafana面板即可浏览监控数据。

github.com/prometheus/cloudwatch_exporter

配置文件:

cat > secret.yaml  <<EOF
apiVersion: v1
kind: Secret
metadata:
  name: cloudwatch-exporter-secret
  namespace: cloudwatch
type: Opaque
data:
  AWS_ACCESS_KEY_ID: QAtJxxxxxxxxxxxxxxgK # base64
  AWS_SECRET_ACCESS_KEY: PktoNE1ZalpLexxxxxxxxxxxxxxzdUZzJXbk5YV2FXYQo= # base64
EOF

# 这个配置在项目的sample目录里有
cat > configmap.yaml <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: cloudwatch-config
  namespace: cloudwatch
data:
  config.yml: |
    region: us-west-2
    metrics:
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: HealthyHostCount
      aws_dimensions:
      - LoadBalancer
      - TargetGroup
      aws_statistics:
      - Minimum
      aws_tag_select:
        resource_type_selection: elasticloadbalancing:targetgroup
        resource_id_dimension: TargetGroup
        arn_resource_id_regexp: "(targetgroup/.*)$"
        tag_selections:
          Environment:
          - production
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: UnHealthyHostCount
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: RequestCount
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Average
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: TargetResponseTime
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Average
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: ActiveConnectionCount
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: NewConnectionCount
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: RejectedConnectionCount
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: TargetConnectionErrorCount
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: RequestCount
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: IPv6RequestCount
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: RequestCountPerTarget
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: NonStickyRequestCount
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: HTTPCode_Target_2XX_Count
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: HTTPCode_Target_3XX_Count
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: HTTPCode_Target_4XX_Count
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: HTTPCode_Target_5XX_Count
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: HTTPCode_ELB_3XX_Count
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: HTTPCode_ELB_4XX_Count
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: HTTPCode_ELB_5XX_Count
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: ProcessedBytes
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: IPv6ProcessedBytes
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: ConsumedLCUs
      delay_seconds: 1800
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: ClientTLSNegotiationErrorCount
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: TargetTLSNegotiationErrorCount
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
    - aws_namespace: AWS/ApplicationELB
      aws_metric_name: RuleEvaluations
      aws_dimensions:
      - LoadBalancer
      aws_statistics:
      - Sum
EOF

cat > cloudwatch-exporter.yaml <<EOF
apiVersion: apps/v1
kind: Deployment
metadata:
  name: cloudwatch-exporter
  namespace: cloudwatch
spec:
  replicas: 1
  selector:
    matchLabels:
      app: cloudwatch-exporter
  template:
    metadata:
      annotations:
        prometheus.io/path: /metrics
        prometheus.io/port: "9106"
        prometheus.io/scheme: http
        prometheus.io/scrape: "true"
      labels:
        app: cloudwatch-exporter
    spec:
      containers:
        - name: cloudwatch-exporter
          image: quay.io/prometheus/cloudwatch-exporter:latest
          env:
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                secretKeyRef:
                  name: cloudwatch-exporter-secret
                  key: AWS_ACCESS_KEY_ID
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                secretKeyRef:
                  name: cloudwatch-exporter-secret
                  key: AWS_SECRET_ACCESS_KEY
          ports:
            - containerPort: 9106
          resources:
            requests:
              cpu: 100m
              memory: 100Mi
            limits:
              cpu: 2
              memory: 2Gi
          volumeMounts:
            - name: config-volume
              mountPath: /config
      volumes:
        - name: config-volume
          configMap:
            name: cloudwatch-config
EOF


将本地文件写入到s3


import boto3
from botocore.exceptions import ClientError
import logging
import os
import argparse
import sys

# 配置日志记录,便于排查问题
logging.basicConfig(level=logging.INFO)

def get_content_type_and_encoding(file_name):
    """
    根据文件扩展名返回相应的Content-Type和Content-Encoding
    
    参数:
        file_name: 文件名
    
    返回:
        tuple: (content_type, content_encoding) 或 (content_type, None)
    """
    if file_name.endswith('.js'):
        return 'application/javascript', None
    elif file_name.endswith('.wasm.gz'):
        return 'application/wasm', 'gzip'
    elif file_name.endswith('.data.gz'):
        return 'application/octet-stream', 'gzip'
    elif file_name.endswith('.js.gz'):
        return 'application/javascript', 'gzip'
    else:
        # 默认类型
        return 'application/octet-stream', None

def upload_file_to_s3(file_name, bucket, object_name=None):
    """
    将本地文件上传到指定的S3存储桶,并配置元数据和权限

    参数:
        file_name: 要上传的本地文件路径
        bucket: 目标S3存储桶名称
        object_name: S3中的对象名称(可选,默认为本地文件名)
    """
    # 如果未指定S3对象名称,则使用本地文件名
    if object_name is None:
        object_name = file_name

    # 根据文件扩展名获取Content-Type和Content-Encoding
    content_type, content_encoding = get_content_type_and_encoding(file_name)
    
    # 构建ExtraArgs参数
    extra_args = {
        'ContentType': content_type,
        'ACL': 'public-read',
    }
    
    # 如果有Content-Encoding,则添加
    if content_encoding:
        extra_args['ContentEncoding'] = content_encoding

    # 创建S3客户端
    s3_client = boto3.client(
        's3',
    )

    try:
        # 使用upload_file方法上传文件,并通过ExtraArgs参数配置元数据和ACL
        s3_client.upload_file(
            Filename=file_name,
            Bucket=bucket,
            Key=object_name,
            ExtraArgs=extra_args
        )
        logging.info(f"文件 {file_name} 已成功上传到存储桶 {bucket} 作为 {object_name},Content-Type: {content_type}" + 
                    (f", Content-Encoding: {content_encoding}" if content_encoding else ""))
        return True
    except ClientError as e:
        logging.error(f"文件上传失败: {e}")
        return False
    except FileNotFoundError:
        logging.error(f"本地文件 {file_name} 未找到。")
        return False


def upload_files_from_directory(directory_path, bucket, prefix_and_version):
    """
    批量上传指定目录中的所有文件到S3
    
    参数:
        directory_path: 本地目录路径
        bucket: 目标S3存储桶名称
        prefix_and_version: 前缀+版本
    
    返回:
        tuple: (成功上传数量, 总文件数量)
    """
    if not os.path.exists(directory_path):
        logging.error(f"目录 {directory_path} 不存在")
        return 0, 0
    
    files = os.listdir(directory_path)
    total_files = len(files)
    success_count = 0
    
    logging.info(f"开始上传 {total_files} 个文件到S3...")
    
    for file_name in files:
        local_file_path = os.path.join(directory_path, file_name)
        
        # 跳过目录
        if os.path.isdir(local_file_path):
            continue
            
        # 构建S3对象键
        s3_object_key = f'{prefix_and_version}/{file_name}'
        
        # 上传文件
        if upload_file_to_s3(local_file_path, bucket, s3_object_key):
            success_count += 1
        else:
            logging.error(f"文件 {file_name} 上传失败")
    
    logging.info(f"上传完成:成功 {success_count}/{total_files} 个文件")
    return success_count, total_files


# 使用示例
if __name__ == "__main__":
    # 创建命令行参数解析器
    parser = argparse.ArgumentParser(description='上传文件到S3存储桶')
    parser.add_argument('prefix_and_version', help='游戏版本路径前缀 (例如: fruit-archery/next/0.1.1)')
    parser.add_argument('--files-dir', default='files', help='本地文件目录路径 (默认: files)')
    parser.add_argument('--bucket', default='toggee-build', help='S3存储桶名称 (默认: toggee-build)')
    
    # 解析命令行参数
    args = parser.parse_args()
    
    # 配置参数
    prefix_and_version = args.prefix_and_version
    local_files_directory = args.files_dir
    s3_bucket_name = args.bucket

    print(f"开始上传文件...")
    print(f"前缀+版本: {prefix_and_version}")
    print(f"本地文件目录: {local_files_directory}")
    print(f"S3存储桶: {s3_bucket_name}")
    print("-" * 50)

    # 批量上传files目录中的所有文件
    success_count, total_count = upload_files_from_directory(local_files_directory, s3_bucket_name, prefix_and_version)
    
    if success_count == total_count and total_count > 0:
        print(f"所有文件上传成功!({success_count}/{total_count})")
        sys.exit(0)
    elif success_count > 0:
        print(f"部分文件上传成功:{success_count}/{total_count}")
        sys.exit(1)
    else:
        print("上传失败,请检查日志和配置。")
        sys.exit(1)


将cloudfront日志写入到本地ES


import boto3
from elasticsearch import Elasticsearch, helpers
from datetime import datetime, timedelta, timezone
import time
import os
import json
import urllib.parse as urlparse

# AWS 凭证配置方法
# 方法1: 从环境变量读取(推荐)
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_REGION = os.getenv('AWS_DEFAULT_REGION', 'us-east-1')

# 方法2: 直接在代码中设置(仅用于测试,生产环境不推荐)
# AWS_ACCESS_KEY_ID = 'YOUR_ACCESS_KEY_ID'
# AWS_SECRET_ACCESS_KEY = 'YOUR_SECRET_ACCESS_KEY'
# AWS_REGION = 'us-east-1'

# 运行参数(可通过环境变量覆盖)
LOG_GROUP_NAME = os.getenv('CWL_LOG_GROUP', 'toggee-build')
ES_URL = os.getenv('ES_URL', 'http://172.16.37.126:9200')
ES_INDEX_PREFIX = os.getenv('ES_INDEX_PREFIX', 'cloudwatch-cloudfront-logs')
POLL_INTERVAL_SECONDS = int(os.getenv('POLL_INTERVAL_SECONDS', '10'))
BACKFILL_MINUTES = int(os.getenv('BACKFILL_MINUTES', '10'))
CHECKPOINT_PATH = os.getenv('CHECKPOINT_PATH', './cwl_to_es_checkpoint.json')

# 配置 AWS CloudWatch Logs 客户端
cloudwatch_logs = boto3.client(
    'logs',
    region_name=AWS_REGION,
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

# 配置 Elasticsearch 客户端
es = Elasticsearch([ES_URL])

def parse_cloudfront_message(message_str, cw_event_ts_ms=None):
	"""
	解析 CloudFront 风格的 JSON 字符串,标准化字段值并生成 ES 时间戳。

	返回: (parsed_dict, es_timestamp_iso)
	"""
	if not isinstance(message_str, str):
		return None, None
	try:
		data = json.loads(message_str)
	except Exception:
		return None, None

	if not isinstance(data, dict):
		return None, None

	parsed = {}
	decode_keys = {
		"cs(Referer)",
		"cs(User-Agent)",
		"cs-uri-stem",
		"cs-uri-query",
		"cs(Host)"
	}

	for key, value in data.items():
		# 统一将 '-' 视为 None
		if isinstance(value, str) and value == '-':
			parsed[key] = None
			continue

		# 可能存在百分号编码,按 key 白名单解码
		if isinstance(value, str) and key in decode_keys:
			try:
				value = urlparse.unquote_plus(value)
			except Exception:
				pass

		# 将数字字符串转换为数字类型
		if isinstance(value, str):
			# 尝试 int
			try:
				if value.isdigit() or (value.startswith('-') and value[1:].isdigit()):
					parsed[key] = int(value)
					continue
			except Exception:
				pass
			# 尝试 float
			try:
				parsed[key] = float(value)
				continue
			except Exception:
				parsed[key] = value
		else:
			parsed[key] = value

	# 生成 ES 使用的 @timestamp (ISO8601, UTC)
	ts_ms = None
	val_ms = parsed.get('timestamp(ms)')
	if isinstance(val_ms, (int, float)):
		ts_ms = int(val_ms)

	if ts_ms is None:
		val_s = parsed.get('timestamp')
		if isinstance(val_s, (int, float)):
			ts_ms = int(float(val_s) * 1000)

	# 由 date + time 推断
	if ts_ms is None and isinstance(parsed.get('date'), str) and isinstance(parsed.get('time'), str):
		try:
			dt = datetime.strptime(f"{parsed['date']} {parsed['time']}", "%Y-%m-%d %H:%M:%S")
			ts_ms = int(dt.replace(tzinfo=timezone.utc).timestamp() * 1000)
		except Exception:
			pass

	if ts_ms is None and cw_event_ts_ms is not None:
		ts_ms = int(cw_event_ts_ms)

	es_ts_iso = None
	if ts_ms is not None:
		es_ts_iso = datetime.fromtimestamp(ts_ms / 1000.0, tz=timezone.utc).isoformat()

	return parsed, es_ts_iso

def get_log_events(log_group_name, start_time, end_time):
    """
    从指定 CloudWatch 日志组获取特定时间范围内的日志事件。
    
    参数:
        log_group_name: CloudWatch 日志组名称
        start_time: 开始时间 (datetime 对象或时间戳)
        end_time: 结束时间 (datetime 对象或时间戳)
        
    返回:
        日志事件列表
    """
    # 确保时间为毫秒时间戳
    if isinstance(start_time, datetime):
        start_time = int(start_time.timestamp() * 1000)
    if isinstance(end_time, datetime):
        end_time = int(end_time.timestamp() * 1000)
        
    log_events = []
    next_token = None
    
    try:
        while True:
            params = {
                'logGroupName': log_group_name,
                'startTime': start_time,
                'endTime': end_time,
            }
            if next_token:
                params['nextToken'] = next_token

            response = cloudwatch_logs.filter_log_events(**params)
            events = response.get('events', [])
            log_events.extend(events)

            next_token = response.get('nextToken')
            if not next_token:
                break

    except Exception as e:
        print(f"获取日志时出错: {e}")
        
    return log_events

def prepare_actions_for_es(log_events, index_name):
    """
    将 CloudWatch 日志事件格式化为 Elasticsearch 批量操作。
    
    参数:
        log_events: 从 get_log_events 获取的日志事件列表
        index_name: Elasticsearch 索引名称
        
    返回:
        用于 helpers.bulk 的操作列表
    """
    actions = []
    for event in log_events:
        raw_message = event.get('message')
        parsed, es_ts_iso = parse_cloudfront_message(raw_message, cw_event_ts_ms=event.get('timestamp'))

        source = {
            # 原始 CloudWatch 元信息
            'cloudwatch_timestamp_ms': event.get('timestamp'),
            'cloudwatch_logStreamName': event.get('logStreamName'),
            'cloudwatch_ingestionTime': event.get('ingestionTime'),
            'cloudwatch_eventId': event.get('eventId'),
            'message_raw': raw_message,
        }

        # 合并解析后的字段至顶层,避免深层嵌套,便于检索
        if isinstance(parsed, dict):
            for k, v in parsed.items():
                source[k] = v

        # 规范化 @timestamp
        source['@timestamp'] = es_ts_iso or datetime.now(timezone.utc).isoformat()

        doc = {
            '_op_type': 'index',
            '_index': index_name,
            '_source': source
        }
        actions.append(doc)
    return actions

def load_checkpoint(path):
    try:
        if not os.path.exists(path):
            return None
        with open(path, 'r', encoding='utf-8') as f:
            data = json.load(f)
            # 返回毫秒级时间戳
            return int(data.get('last_timestamp_ms'))
    except Exception as e:
        print(f"读取检查点失败: {e}")
        return None

def save_checkpoint(path, last_ts_ms):
    try:
        tmp_path = f"{path}.tmp"
        with open(tmp_path, 'w', encoding='utf-8') as f:
            json.dump({'last_timestamp_ms': int(last_ts_ms)}, f)
        os.replace(tmp_path, path)
    except Exception as e:
        print(f"保存检查点失败: {e}")

def index_name_for_datetime(prefix, dt):
    return f"{prefix}-{dt.strftime('%Y.%m.%d')}"

def continuous_sync():
    # 初始化起始时间:检查点或回溯窗口
    checkpoint_ms = load_checkpoint(CHECKPOINT_PATH)
    if checkpoint_ms is None:
        start_dt = datetime.now(timezone.utc) - timedelta(minutes=BACKFILL_MINUTES)
        start_ms = int(start_dt.timestamp() * 1000)
    else:
        start_ms = checkpoint_ms + 1

    print(f"开始持续同步。日志组='{LOG_GROUP_NAME}', ES='{ES_URL}', 索引前缀='{ES_INDEX_PREFIX}'")
    print(f"初始起点(ms): {start_ms}")

    try:
        while True:
            end_dt = datetime.now(timezone.utc)
            end_ms = int(end_dt.timestamp() * 1000)

            # 读取窗口
            events = get_log_events(LOG_GROUP_NAME, start_ms, end_ms)
            if events:
                # CloudWatch 可能跨天,按事件时间分桶写入对应日期索引
                events_sorted = sorted(events, key=lambda e: (e.get('timestamp') or 0, e.get('eventId') or ''))
                # 批量按天写入
                batch_actions = []
                for ev in events_sorted:
                    ev_ts = ev.get('timestamp') or end_ms
                    ev_dt = datetime.fromtimestamp(ev_ts / 1000.0, tz=timezone.utc)
                    idx = index_name_for_datetime(ES_INDEX_PREFIX, ev_dt)
                    batch_actions.extend(prepare_actions_for_es([ev], idx))

                try:
                    success, failed = helpers.bulk(es, batch_actions, stats_only=True)
                    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 写入 ES 成功: {success} 条;时间段 [{start_ms}, {end_ms}];事件数 {len(events_sorted)}")
                except Exception as es_err:
                    print(f"批量写入 ES 出错: {es_err}")

                # 更新检查点为最后一条事件的时间戳
                last_event_ts = events_sorted[-1].get('timestamp') or end_ms
                save_checkpoint(CHECKPOINT_PATH, last_event_ts)
                start_ms = last_event_ts + 1
            else:
                # 无新数据,保持窗口不回退
                pass

            time.sleep(POLL_INTERVAL_SECONDS)
    except KeyboardInterrupt:
        print("收到中断信号,安全退出。")
    except Exception as e:
        print(f"持续同步出错: {e}")

def main():
    # 持续同步模式
    print("启动持续日志同步模式...")
    print(f"配置: 日志组={LOG_GROUP_NAME}, ES={ES_URL}, 前缀={ES_INDEX_PREFIX}, 轮询={POLL_INTERVAL_SECONDS}s, 回溯={BACKFILL_MINUTES}min, 检查点={CHECKPOINT_PATH}")
    continuous_sync()

if __name__ == "__main__":
    main()