问题描述
collector.conf
collector {
interface = "0.0.0.0"
interface = ${?COLLECTOR_INTERFACE}
port = 8181
port = ${?COLLECTOR_PORT}
# optional SSL/TLS configuration
ssl {
enable = false
enable = ${?COLLECTOR_SSL}
# whether to redirect HTTP to HTTPS
redirect = false
redirect = ${?COLLECTOR_SSL_REDIRECT}
port = 9543
port = ${?COLLECTOR_SSL_PORT}
}
paths {
# "/com.acme/track" = "/com.sNowplowanalytics.sNowplow/tp2"
# "/com.acme/redirect" = "/r/tp2"
# "/com.acme/iglu" = "/com.sNowplowanalytics.iglu/v1"
}
# Configure the P3P policy header.
p3p {
policyRef = "/w3c/p3p.xml"
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
}
crossDomain {
enabled = false
# Domains that are granted access,*.acme.com will match http://acme.com and http://sub.acme.com
enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
domains = [ "*" ]
domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
# Whether to only grant access to HTTPS or both HTTPS and HTTP sources
secure = true
secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
}
cookie {
enabled = true
enabled = ${?COLLECTOR_COOKIE_ENABLED}
expiration = "365 days"
expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
# Network cookie name
name = zanui_collector_cookie
name = ${?COLLECTOR_COOKIE_NAME}
domains = [
"{{cookieDomain1}}" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned
"{{cookieDomain2}}" # e.g. "secure.anotherdomain.com" -> any origin domain ending with this will be matched and secure.anotherdomain.com will be returned
# ... more domains
]
domains += ${?COLLECTOR_COOKIE_DOMAIN_1}
domains += ${?COLLECTOR_COOKIE_DOMAIN_2}
fallbackDomain = ""
fallbackDomain = ${?FALLBACK_DOMAIN}
secure = false
secure = ${?COLLECTOR_COOKIE_SECURE}
httpOnly = false
httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
sameSite = "{{cookieSameSite}}"
sameSite = ${?COLLECTOR_COOKIE_SAME_SITE}
}
doNottrackCookie {
enabled = false
enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
# name = {{doNottrackCookieName}}
name = zanui-collector-do-not-track-cookie
# value = {{doNottrackCookieValue}}
value = zanui-collector-do-not-track-cookie-value
}
cookieBounce {
enabled = false
enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
name = "n3pc"
name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
forwardedProtocolHeader = "X-Forwarded-Proto"
forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
}
enableDefaultRedirect = true
enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}
redirectMacro {
enabled = false
enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
# Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
placeholder = "[TOKEN]"
placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
}
rootResponse {
enabled = false
enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
statusCode = 302
statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
# Optional,defaults to empty map
headers = {
Location = "https://127.0.0.1/",Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},X-Custom = "something"
}
# Optional,defaults to empty string
body = "302,redirecting"
body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
}
cors {
accessControlMaxAge = 5 seconds
accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
}
# Configuration of prometheus http metrics
prometheusMetrics {
enabled = false
}
streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = sNowplow-collected-good-events-stream
good = ${?COLLECTOR_STREAMS_GOOD}
# Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
bad = sNowplow-collected-bad-events-stream
bad = ${?COLLECTOR_STREAMS_BAD}
useIpAddressAsPartitionKey = false
useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}
sink {
enabled = kinesis
enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}
# Region where the streams are located
region = ap-southeast-2
region = ${?COLLECTOR_STREAMS_SINK_REGION}
threadPoolSize = 10
threadPoolSize = ${?COLLECTOR_STREAMS_SINK_THREAD_POOL_SIZE}
aws {
accessKey = env
accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
secretKey = env
secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
}
# Minimum and maximum backoff periods,in milliseconds
backoffPolicy {
#minBackoff = {{minBackoffMillis}}
minBackoff = 10
#maxBackoff = {{maxBackoffMillis}}
maxBackoff = 10
}
}
buffer {
byteLimit = 4500000
byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
recordLimit =500 # Not supported by Kafka; will be ignored
recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
timeLimit = 5000
timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
}
}
}
akka {
loglevel = DEBUG # 'OFF' for no logging,'DEBUG' for all logging.
loglevel = ${?AKKA_LOGLEVEL}
loggers = ["akka.event.slf4j.Slf4jLogger"]
loggers = [${?AKKA_LOGGERS}]
http.server {
remote-address-header = on
remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}
raw-request-uri-header = on
raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}
# Define the maximum request length (the default is 2048)
parsing {
max-uri-length = 32768
max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
uri-parsing-mode = relaxed
uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
}
}
}
运行命令:
java -Dcom.amazonaws.sdk.disableCbor -jar sNowplow-stream-collector-kinesis-1.0.0.jar --config collector.conf
enricher.conf
enrich {
streams {
in {
# Stream/topic where the raw events to be enriched are located
raw = sNowplow-collected-good-events-stream
raw = ${?ENRICH_STREAMS_IN_RAW}
}
out {
# Stream/topic where the events that were successfully enriched will end up
enriched = sNowplow-collected-good-events-stream
# Stream/topic where the event that Failed enrichment will be stored
bad = sNowplow-collected-bad-events-stream
bad = ${?ENRICH_STREAMS_OUT_BAD}
# Stream/topic where the pii tranformation events will end up
# pii = {{outPii}}
# pii = ${?ENRICH_STREAMS_OUT_PII}
partitionKey = event_id
partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY}
}
sourceSink {
enabled = kinesis
enabled = ${?ENRICH_STREAMS_SOURCE_SINK_ENABLED}
region = ap-southeast-2
aws {
accessKey = env
accessKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_ACCESS_KEY}
secretKey = env
secretKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_SECRET_KEY}
}
maxRecords = 10000
initialPosition = TRIM_HORIZON
initialTimestamp = "2020-09-10T10:00:00Z"
backoffPolicy {
minBackoff = 1000
minBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MIN_BACKOFF}
maxBackoff = 5000
maxBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MAX_BACKOFF}
}
}
buffer {
byteLimit = 1000000000
byteLimit = ${?ENRICH_STREAMS_BUFFER_BYTE_LIMIT}
recordLimit = 10 # Not supported by Kafka; will be ignored
recordLimit = ${?ENRICH_STREAMS_BUFFER_RECORD_LIMIT}
timeLimit = 5000
timeLimit = ${?ENRICH_STREAMS_BUFFER_TIME_LIMIT}
}
appName = "zanui-enricher-app"
appName = ${?ENRICH_STREAMS_APP_NAME}
}
}
运行命令:
java -jar sNowplow-stream-enrich-kinesis-1.0.0.jar --config enricher.conf --resolver file:resolver.json
S3加载程序配置
source = "kinesis"
sink = "kinesis"
aws {
accessKey = "env"
secretKey = "env"
}
# Config for NSQ
nsq {
channelName = "nsqSourceChannelName"
# Host name for NSQ tools
host = "127.0.0.1"
# HTTP port for nsqd
port = 4150
# HTTP port for nsqlookupd
lookupPort = 4161
}
kinesis {
initialPosition = "TRIM_HORIZON"
initialTimestamp = "2017-05-17T10:00:00Z"
maxRecords = 10000
region = "ap-southeast-2"
appName = "zanui-enricher-app"
}
streams {
inStreamName = "sNowplow-collected-good-events-stream"
outStreamName = "sNowplow-collected-bad-events-stream"
buffer {
byteLimit = 1000000000 # Not supported by NSQ; will be ignored
recordLimit = 10
timeLimit = 5000 # Not supported by NSQ; will be ignored
}
}
s3 {
region = "ap-southeast-2"
bucket = "sNowplow-enriched-good-events"
partitionedBucket = "sNowplow-enriched-good-events/partitioned"
dateFormat = "{YYYY}/{MM}/{dd}/{HH}"
outputDirectory = "zanui-enriched/good"
filenamePrefix = "zanui-output"
format = "gzip"
# Maximum Timeout that the application is allowed to fail for (in milliseconds)
maxTimeout = 300000 # 5 minutes
}
运行命令:
java -jar sNowplow-s3-loader-0.6.0.jar --config my.conf
但是此SNowplow S3加载程序没有执行任何操作,因此我使用Data Fireshose将流传输到S3存储桶。
当我尝试在Data Fireshose中使用Aws Lambda时,会给出错误消息
{"attemptsMade":4,"arrivalTimestamp":1600154159619,"errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result.","attemptEndingTimestamp":1600154235846,"rawData":"****","lambdaArn":"arn:aws:lambda:ap-southeast-2:573188294151:function:sNowplow-json-transformer-lambda:$LATEST"}
{"attemptsMade":4,"arrivalTimestamp":1600154161523,"rawData":"*****=","lambdaArn":"arn:aws:lambda:ap-southeast-2:573188294151:function:sNowplow-json-transformer-lambda:$LATEST"}
如果我不使用lambda,则在S3优质浓缩桶中为页面查看事件创建日志,但与此同时,在S3不良浓缩桶中为相同的页面查看事件创建日志
{"schema":"iglu:com.sNowplowanalytics.sNowplow.badrows/collector_payload_format_violation/jsonschema/1-0-0","data":{"processor":{"artifact":"sNowplow-stream-enrich","version":"1.0.0"},"failure":{"timestamp":"2020-09-15T07:16:02.488Z","loader":"thrift","message":{"error":"error deserializing raw event: Cannot read. Remote side has closed. Tried to read 2 bytes,but only got 1 bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)"}},"payload":"****="}}
我反复遵循了文档,但是我对流丰富的设置感到困惑。我不明白的是,如果不使用自定义架构,是否需要设置数据库以实现流丰富?因为既然我试图用Javascript Tracker的Page View事件进行测试,所以我没有设置任何数据库。但是我已经提供了DynamoDb创建,IAM角色编辑的访问权限。
如果有人做过,请帮助我设置扫雪机。请:(
解决方法
我写了一个博客,介绍如何在AWS中设置Snowplow Analytics。
这里是link,希望对您有所帮助。