Commit fd515364 authored by Mholloway's avatar Mholloway
Browse files

[Swift] Add support for multiple destination event services

Adds support for sending events to different intake services
based on the stream config. With this change, the library supports
submitting error events to eventgate-logging-external.

Note: This commit drops the inputBuffer/outputBuffer terminology in
favor of more explanatory names: unvalidatedEvents, validatedEvents,
and validatedErrors. The latter are implemented as instances of a
new EventBuffer class which associates a queue of events with a
destination event service.

Bug: T290770
Change-Id: I71466b11f6c3b9f7fbd8c6b15684d92ec1459b49
parent d930a4e2
import Foundation
enum DestinationEventService: String, Decodable {
case analytics = "eventgate-analytics-external"
case errorLogging = "eventgate-logging-external"
}
import Foundation
import DequeModule
class EventBuffer {
var events: Deque<Event>
let destination: URL
init(destination: URL) {
self.events = Deque()
self.destination = destination
}
func prepend(_ event: Event) {
events.prepend(event)
}
func append(_ event: Event) {
events.append(event)
}
func popFirst() -> Event? {
return events.popFirst()
}
}
......@@ -34,7 +34,6 @@
*/
import Foundation
import FoundationNetworking
import DequeModule
/**
* Wikimedia Metrics Client - Swift
......@@ -65,12 +64,17 @@ public class MetricsClient {
* about how to evaluate incoming event data. Until this initialization is complete, we store any incoming
* events in this buffer.
*/
private var inputBuffer: LimitedCapacityDeque<Event> = LimitedCapacityDeque(capacity: 128)
private var unvalidatedEvents: LimitedCapacityDeque<Event> = LimitedCapacityDeque(capacity: 128)
/**
* Holds validated events that have been scheduled for POSTing
* Holds validated events that have been scheduled for POSTing to the analytics event intake service
*/
private var outputBuffer: Deque<Event> = Deque()
private var validatedEvents = EventBuffer(destination: URL(string: "https://intake-analytics.wikimedia.org/v1/events")!)
/**
* Holds validated error events that have been scheduled for POSTing to the error logging intake service
*/
private var validatedErrors = EventBuffer(destination: URL(string: "https://intake-logging.wikimedia.org/v1/events")!)
/**
* Serial dispatch queue that enables working with properties in a thread-safe way
......@@ -90,28 +94,13 @@ public class MetricsClient {
private let dateFormatter = ISO8601DateFormatter()
/**
* Where to send events to for intake
*
* See [wikitech:Event Platform/EventGate](https://wikitech.wikimedia.org/wiki/Event_Platform/EventGate)
* for more information. Specifically, the section on **eventgate-analytics-external**. This service uses the stream
* configurations from Meta wiki as its source of truth.
*/
private let eventIntakeURI = URL(string: "https://intake-analytics.wikimedia.org/v1/events")!
/**
* MediaWiki API endpoint which returns stream configurations as JSON
*
* Streams are configured via [mediawiki-config/wmf-config/InitialiseSettings.php](https://gerrit.wikimedia.org/g/operations/mediawiki-config/+/master/wmf-config/InitialiseSettings.php)
* and made available for external consumption via MediaWiki API via [Extension:EventStreamConfig](https://gerrit.wikimedia.org/g/mediawiki/extensions/EventStreamConfig/)
*
* In production, we use [Meta wiki](https://meta.wikimedia.org/wiki/Main_Page)
* [streamconfigs endpoint](https://meta.wikimedia.org/w/api.php?action=help&modules=streamconfigs)
* with the constraint that the `destination_event_service` is configured to
* be "eventgate-analytics-external" (to filter out irrelevant streams from
* the returned list of stream configurations).
*/
private let streamConfigsURI = URL(string: "https://meta.wikimedia.org/w/api.php?action=streamconfigs&format=json&constraints=destination_event_service=eventgate-analytics-external")!
private let streamConfigsURI = URL(string: "https://meta.wikimedia.org/w/api.php?action=streamconfigs&format=json&all_settings")!
/**
* Dictionary of stream configurations keyed by stream name.
......@@ -186,7 +175,7 @@ public class MetricsClient {
queue.async {
if self.streamConfigs == nil {
self.inputBuffer.append(event)
self.unvalidatedEvents.append(event)
return
}
guard let config = self.streamConfigs?[stream] else {
......@@ -197,7 +186,13 @@ public class MetricsClient {
NSLog("MetricsClient: Stream '\(stream)' is not in sample")
return
}
self.outputBuffer.append(event)
if config.destination == DestinationEventService.analytics {
self.validatedEvents.append(event)
} else if config.destination == DestinationEventService.errorLogging {
self.validatedErrors.append(event)
} else {
// TODO: Unknown destination service; Log a client error
}
}
}
......@@ -262,61 +257,69 @@ public class MetricsClient {
NSLog("MetricsClient: Problem processing JSON payload from response: \(error)")
}
self.processInputBuffer()
self.processUnvalidatedEvents()
}
}
/**
* Process input buffer upon stream configs becoming available.
* Process unvalidated events upon stream configs becoming available.
*/
private func processInputBuffer() {
private func processUnvalidatedEvents() {
queue.sync {
while let event = self.inputBuffer.popFirst() {
while let event = self.unvalidatedEvents.popFirst() {
guard let config = streamConfigs?[event.meta.stream] else {
continue
}
guard samplingController.inSample(stream: event.meta.stream, config: config) else {
continue
}
self.outputBuffer.append(event)
if config.destination == DestinationEventService.analytics {
self.validatedEvents.append(event)
} else if config.destination == DestinationEventService.errorLogging {
self.validatedErrors.append(event)
} else {
// TODO: Unknown destination service; Log a client error
}
}
}
}
private func postAllScheduled(_ completion: (() -> Void)? = nil) {
encodeQueue.async {
NSLog("MetricsClient: Posting all scheduled events")
[self.validatedEvents, self.validatedErrors].forEach { buffer in
NSLog("MetricsClient: Posting all scheduled events")
var eventsToSend: [Event] = []
while let event = self.outputBuffer.popFirst() {
eventsToSend.append(event)
}
var data: Data
do {
data = try self.encoder.encode(eventsToSend)
} catch {
NSLog("MetricsClient: Error encoding events pending submission")
return
}
var eventsToSend: [Event] = []
while let event = buffer.popFirst() {
eventsToSend.append(event)
}
#if DEBUG
if let jsonString = String(data: data, encoding: .utf8) {
NSLog("MetricsClient: Sending event with POST body:\n\(jsonString)")
}
#endif
var data: Data
do {
data = try self.encoder.encode(eventsToSend)
} catch {
NSLog("MetricsClient: Error encoding events pending submission")
return
}
self.integration.httpPost(self.eventIntakeURI, body: data) { result in
switch result {
case .success:
NSLog("MetricsClient: Event submission succeeded")
break
case .failure:
// Re-enqueue event to be retried on next POST attempt
for failedEvent in eventsToSend.reversed() {
self.outputBuffer.prepend(failedEvent)
#if DEBUG
if let jsonString = String(data: data, encoding: .utf8) {
NSLog("MetricsClient: Sending event with POST body:\n\(jsonString)")
}
#endif
self.integration.httpPost(buffer.destination, body: data) { result in
switch result {
case .success:
NSLog("MetricsClient: Event submission succeeded")
break
case .failure:
// Re-enqueue event to be retried on next POST attempt
for failedEvent in eventsToSend.reversed() {
buffer.prepend(failedEvent)
}
NSLog("MetricsClient: Event submission failed; re-enqueuing to retry later")
}
NSLog("MetricsClient: Event submission failed; re-enqueuing to retry later")
}
}
}
......
......@@ -4,6 +4,7 @@ import Foundation
struct StreamConfig: Decodable {
var stream: String
var schema: String
var destination: DestinationEventService
var producerConfig: ProducerConfig?
func getSamplingConfig() -> SamplingConfig? {
......@@ -13,6 +14,7 @@ struct StreamConfig: Decodable {
enum CodingKeys: String, CodingKey {
case stream
case schema = "schema_title"
case destination = "destination_event_service"
case producerConfig = "producers"
}
......
......@@ -39,7 +39,12 @@ final class ContextControllerTests: XCTestCase {
]
let clientConfig = StreamConfig.ProducerConfig.MetricsPlatformClientConfig(requestedValues: requestedValues)
let producerConfig = StreamConfig.ProducerConfig(clientConfig: clientConfig)
let streamConfig = StreamConfig(stream: "test.event", schema: "test/event", producerConfig: producerConfig)
let streamConfig = StreamConfig(
stream: "test.event",
schema: "test/event",
destination: DestinationEventService.analytics,
producerConfig: producerConfig
)
self.streamConfigs = ["test.event": streamConfig]
}
......
......@@ -16,10 +16,18 @@ final class SamplingControllerTests: XCTestCase {
let samplingConfig = SamplingConfig(rate: 0.5, identifier: "session")
let clientConfig = StreamConfig.ProducerConfig.MetricsPlatformClientConfig(samplingConfig: samplingConfig)
let producerConfig = StreamConfig.ProducerConfig(clientConfig: clientConfig)
let streamConfigWithSamplingConfig = StreamConfig(stream: "test.event", schema: "test/event", producerConfig: producerConfig)
let streamConfigWithoutSamplingConfig = StreamConfig(stream: "test.event", schema: "test/event", producerConfig: nil)
let streamConfigWithSamplingConfig = StreamConfig(
stream: "test.event",
schema: "test/event",
destination: DestinationEventService.analytics,
producerConfig: producerConfig
)
let streamConfigWithoutSamplingConfig = StreamConfig(
stream: "test.event",
schema: "test/event",
destination: DestinationEventService.analytics,
producerConfig: nil
)
self.streamConfigs = [
"hasSamplingConfig": streamConfigWithSamplingConfig,
"noSamplingConfig": streamConfigWithoutSamplingConfig
......
......@@ -9,13 +9,14 @@ final class StreamConfigTests: XCTestCase {
\"producers\": {
\"metrics_platform_client\": {
\"sampling\": {
\"identifier\":\"device\",
\"rate\":0.1
\"identifier\": \"device\",
\"rate\": 0.1
}
}
},
\"schema_title\":\"test\\/event\",
\"stream\":\"test.event\"
\"schema_title\": \"test\\/event\",
\"stream\": \"test.event\",
\"destination_event_service\": \"eventgate-analytics-external\"
}
"""
guard let data = json.data(using: .utf8) else {
......@@ -26,10 +27,11 @@ final class StreamConfigTests: XCTestCase {
let streamConfig = try JSONDecoder().decode(StreamConfig.self, from: data)
XCTAssertEqual(streamConfig.stream, "test.event")
XCTAssertEqual(streamConfig.schema, "test/event")
XCTAssertEqual(streamConfig.destination, .analytics)
XCTAssertEqual(streamConfig.getSamplingConfig()?.rate, 0.1)
XCTAssertEqual(streamConfig.getSamplingConfig()?.identifier, "device")
} catch {
XCTFail("Failed to encode or decode stream config")
XCTFail("Failed to encode or decode stream config: \(error)")
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment