mirror of
https://github.com/ceph/ceph
synced 2025-01-25 04:24:24 +00:00
a7d80da604
example on how to use 'lua scripting' feature to add NATS to the list of bucket notification endpoints depends on PR: #41927 and #42102 Signed-off-by: Matan Breizman <Matan.Brz@gmail.com>
94 lines
3.8 KiB
Lua
94 lines
3.8 KiB
Lua
local json = require ("lunajson")
|
|
local nats = require ("nats")
|
|
|
|
function nats_connect(nats_host, nats_port)
|
|
local nats_params = {
|
|
host = nats_host,
|
|
port = nats_port,
|
|
}
|
|
client = nats.connect(nats_params)
|
|
client:connect()
|
|
end
|
|
|
|
function toJson(request, eventName, opaqueData, configure)
|
|
supported_event = true
|
|
local notification = {
|
|
["Records"] = {
|
|
["eventVersion"] = "2.1",
|
|
["eventSource"] = "ceph:s3",
|
|
["awsRegion"] = request.ZoneGroup.Name,
|
|
["eventTime"] = request.Time,
|
|
["eventName"] = eventName,
|
|
["userIdentity"] = {
|
|
["principalId"] = request.User.Id
|
|
},
|
|
["requestParameters"] = {
|
|
["sourceIPAddress"] = ""
|
|
},
|
|
["responseElements"] = {
|
|
["x-amz-request-id"] = request.Id,
|
|
["x-amz-id-2"] = request.RGWId
|
|
},
|
|
["s3"] = {
|
|
["s3SchemaVersion"] = "1.0",
|
|
["configurationId"] = configure,
|
|
["bucket"] = {
|
|
["name"] = request.Bucket.Name,
|
|
["ownerIdentity"] = {
|
|
["principalId"] = request.Bucket.User.Id
|
|
},
|
|
["arn"] = "arn:aws:s3:" .. request.ZoneGroup.Name .. "::" .. request.Bucket.Name,
|
|
["id"] = request.Bucket.Id
|
|
},
|
|
["object"] = {
|
|
["key"] = request.Object.Name,
|
|
["size"] = request.Object.Size,
|
|
["eTag"] = "", -- eTag is not supported yet
|
|
["versionId"] = request.Object.Instance,
|
|
["sequencer"] = string.format("%x", os.time()),
|
|
["metadata"] = {
|
|
json.encode(request.HTTP.Metadata)
|
|
},
|
|
["tags"] = {
|
|
json.encode(request.Tags)
|
|
}
|
|
}
|
|
},
|
|
["eventId"] = "",
|
|
["opaqueData"] = opaqueData
|
|
}
|
|
}
|
|
return notification
|
|
end
|
|
|
|
supported_event = false
|
|
configure = "mynotif1"
|
|
opaqueData = "me@example.com"
|
|
topic = "Bucket_Notification"
|
|
bucket_name = "mybucket"
|
|
nats_host = '0.0.0.0'
|
|
nats_port = 4222
|
|
|
|
if bucket_name == Request.Bucket.Name then
|
|
--Object Created
|
|
if Request.RGWOp == "put_obj" then
|
|
notification = toJson(Request ,'ObjectCreated:Put', opaqueData, configure)
|
|
elseif Request.RGWOp == "post_obj" then
|
|
notification = toJson(Request ,'ObjectCreated:Post', opaqueData, configure)
|
|
|
|
elseif Request.RGWOp == "copy_obj" then
|
|
notification = toJson(Request ,'ObjectCreated:Copy', opaqueData, configure)
|
|
|
|
--Object Removed
|
|
elseif Request.RGWOp == "delete_obj" then
|
|
notification = toJson(Request ,'ObjectRemoved:Delete', opaqueData, configure)
|
|
end
|
|
|
|
if supported_event == true then
|
|
nats_connect()
|
|
local payload = json.encode(notification)
|
|
client:publish(topic, payload)
|
|
RGWDebugLog("bucket notification sent to nats://" .. nats_host .. ":" .. nats_port .. "/" .. topic)
|
|
end
|
|
end
|