-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcapability.ts
420 lines (361 loc) · 13.7 KB
/
capability.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: 2023-Present The Pepr Authors
import { GenericClass, GroupVersionKind, modelToGroupVersionKind } from "kubernetes-fluent-client";
import { pickBy } from "ramda";
import Log from "../telemetry/logger";
import { isBuildMode, isDevMode, isWatchMode } from "./envChecks";
import { PeprStore, Storage } from "./storage";
import { OnSchedule, Schedule } from "./schedule";
import { Event } from "../enums";
import {
Binding,
BindingFilter,
BindingWithName,
CapabilityCfg,
CapabilityExport,
MutateAction,
MutateActionChain,
ValidateAction,
ValidateActionChain,
WatchLogAction,
FinalizeAction,
FinalizeActionChain,
WhenSelector,
} from "../types";
import { addFinalizer } from "../finalizer";
const registerAdmission = isBuildMode() || !isWatchMode();
const registerWatch = isBuildMode() || isWatchMode() || isDevMode();
/**
* A capability is a unit of functionality that can be registered with the Pepr runtime.
*/
export class Capability implements CapabilityExport {
#name: string;
#description: string;
#namespaces?: string[] | undefined;
#bindings: Binding[] = [];
#store = new Storage();
#scheduleStore = new Storage();
#registered = false;
#scheduleRegistered = false;
hasSchedule: boolean;
/**
* Run code on a schedule with the capability.
*
* @param schedule The schedule to run the code on
* @returns
*/
OnSchedule: (schedule: Schedule) => void = (schedule: Schedule) => {
const { name, every, unit, run, startTime, completions } = schedule;
this.hasSchedule = true;
if (process.env.PEPR_WATCH_MODE === "true" || process.env.PEPR_MODE === "dev") {
// Only create/watch schedule store if necessary
// Create a new schedule
const newSchedule: Schedule = {
name,
every,
unit,
run,
startTime,
completions,
};
this.#scheduleStore.onReady(() => {
new OnSchedule(newSchedule).setStore(this.#scheduleStore);
});
}
};
public getScheduleStore(): Storage {
return this.#scheduleStore;
}
/**
* Store is a key-value data store that can be used to persist data that should be shared
* between requests. Each capability has its own store, and the data is persisted in Kubernetes
* in the `pepr-system` namespace.
*
* Note: You should only access the store from within an action.
*/
Store: PeprStore = {
clear: this.#store.clear,
getItem: this.#store.getItem,
removeItem: this.#store.removeItem,
removeItemAndWait: this.#store.removeItemAndWait,
setItem: this.#store.setItem,
subscribe: this.#store.subscribe,
onReady: this.#store.onReady,
setItemAndWait: this.#store.setItemAndWait,
};
/**
* ScheduleStore is a key-value data store used to persist schedule data that should be shared
* between intervals. Each Schedule shares store, and the data is persisted in Kubernetes
* in the `pepr-system` namespace.
*
* Note: There is no direct access to schedule store
*/
ScheduleStore: PeprStore = {
clear: this.#scheduleStore.clear,
getItem: this.#scheduleStore.getItem,
removeItemAndWait: this.#scheduleStore.removeItemAndWait,
removeItem: this.#scheduleStore.removeItem,
setItemAndWait: this.#scheduleStore.setItemAndWait,
setItem: this.#scheduleStore.setItem,
subscribe: this.#scheduleStore.subscribe,
onReady: this.#scheduleStore.onReady,
};
get bindings(): Binding[] {
return this.#bindings;
}
get name(): string {
return this.#name;
}
get description(): string {
return this.#description;
}
get namespaces(): string[] {
return this.#namespaces || [];
}
constructor(cfg: CapabilityCfg) {
this.#name = cfg.name;
this.#description = cfg.description;
this.#namespaces = cfg.namespaces;
this.hasSchedule = false;
Log.info(`Capability ${this.#name} registered`);
Log.debug(cfg);
}
/**
* Register the store with the capability. This is called automatically by the Pepr controller.
*/
registerScheduleStore = (): Storage => {
Log.info(`Registering schedule store for ${this.#name}`);
if (this.#scheduleRegistered) {
throw new Error(`Schedule store already registered for ${this.#name}`);
}
this.#scheduleRegistered = true;
// Pass back any ready callback to the controller
return this.#scheduleStore;
};
/**
* Register the store with the capability. This is called automatically by the Pepr controller.
*
* @param store
*/
registerStore = (): Storage => {
Log.info(`Registering store for ${this.#name}`);
if (this.#registered) {
throw new Error(`Store already registered for ${this.#name}`);
}
this.#registered = true;
// Pass back any ready callback to the controller
return this.#store;
};
/**
* The When method is used to register a action to be executed when a Kubernetes resource is
* processed by Pepr. The action will be executed if the resource matches the specified kind and any
* filters that are applied.
*
* @param model the KubernetesObject model to match
* @param kind if using a custom KubernetesObject not available in `a.*`, specify the GroupVersionKind
* @returns
*/
When = <T extends GenericClass>(model: T, kind?: GroupVersionKind): WhenSelector<T> => {
const matchedKind = modelToGroupVersionKind(model.name);
// If the kind is not specified and the model is not a KubernetesObject, throw an error
if (!matchedKind && !kind) {
throw new Error(`Kind not specified for ${model.name}`);
}
const binding: Binding = {
model,
// If the kind is not specified, use the matched kind from the model
kind: kind || matchedKind,
event: Event.ANY,
filters: {
name: "",
namespaces: [],
regexNamespaces: [],
regexName: "",
labels: {},
annotations: {},
deletionTimestamp: false,
},
};
const bindings = this.#bindings;
const prefix = `${this.#name}: ${model.name}`;
const commonChain = { WithLabel, WithAnnotation, WithDeletionTimestamp, Mutate, Validate, Watch, Reconcile, Alias };
type CommonChainType = typeof commonChain;
type ExtendedCommonChainType = CommonChainType & {
Alias: (alias: string) => CommonChainType;
InNamespace: (...namespaces: string[]) => BindingWithName<T>;
InNamespaceRegex: (...namespaces: RegExp[]) => BindingWithName<T>;
WithName: (name: string) => BindingFilter<T>;
WithNameRegex: (regexName: RegExp) => BindingFilter<T>;
WithDeletionTimestamp: () => BindingFilter<T>;
};
const isNotEmpty = (value: object): boolean => Object.keys(value).length > 0;
const log = (message: string, cbString: string): void => {
const filteredObj = pickBy(isNotEmpty, binding.filters);
Log.info(`${message} configured for ${binding.event}`, prefix);
Log.info(filteredObj, prefix);
Log.debug(cbString, prefix);
};
function Validate(validateCallback: ValidateAction<T>): ValidateActionChain<T> {
if (registerAdmission) {
log("Validate Action", validateCallback.toString());
// Create the child logger
const aliasLogger = Log.child({ alias: binding.alias || "no alias provided" });
// Push the binding to the list of bindings for this capability as a new BindingAction
// with the callback function to preserve
bindings.push({
...binding,
isValidate: true,
validateCallback: async (req, logger = aliasLogger) => {
Log.info(`Executing validate action with alias: ${binding.alias || "no alias provided"}`);
return await validateCallback(req, logger);
},
});
}
return { Watch, Reconcile };
}
function Mutate(mutateCallback: MutateAction<T>): MutateActionChain<T> {
if (registerAdmission) {
log("Mutate Action", mutateCallback.toString());
// Create the child logger
const aliasLogger = Log.child({ alias: binding.alias || "no alias provided" });
// Push the binding to the list of bindings for this capability as a new BindingAction
// with the callback function to preserve
bindings.push({
...binding,
isMutate: true,
mutateCallback: async (req, logger = aliasLogger) => {
Log.info(`Executing mutation action with alias: ${binding.alias || "no alias provided"}`);
await mutateCallback(req, logger);
},
});
}
// Now only allow adding actions to the same binding
return { Watch, Validate, Reconcile };
}
function Watch(watchCallback: WatchLogAction<T>): FinalizeActionChain<T> {
if (registerWatch) {
log("Watch Action", watchCallback.toString());
// Create the child logger and cast it to the expected type
const aliasLogger = Log.child({ alias: binding.alias || "no alias provided" }) as typeof Log;
// Push the binding to the list of bindings for this capability as a new BindingAction
// with the callback function to preserve
bindings.push({
...binding,
isWatch: true,
watchCallback: async (update, phase, logger = aliasLogger) => {
Log.info(`Executing watch action with alias: ${binding.alias || "no alias provided"}`);
await watchCallback(update, phase, logger);
},
});
}
return { Finalize };
}
function Reconcile(reconcileCallback: WatchLogAction<T>): FinalizeActionChain<T> {
if (registerWatch) {
log("Reconcile Action", reconcileCallback.toString());
// Create the child logger and cast it to the expected type
const aliasLogger = Log.child({ alias: binding.alias || "no alias provided" }) as typeof Log;
// Push the binding to the list of bindings for this capability as a new BindingAction
// with the callback function to preserve
bindings.push({
...binding,
isWatch: true,
isQueue: true,
watchCallback: async (update, phase, logger = aliasLogger) => {
Log.info(`Executing reconcile action with alias: ${binding.alias || "no alias provided"}`);
await reconcileCallback(update, phase, logger);
},
});
}
return { Finalize };
}
function Finalize(finalizeCallback: FinalizeAction<T>): void {
log("Finalize Action", finalizeCallback.toString());
// Create the child logger and cast it to the expected type
const aliasLogger = Log.child({ alias: binding.alias || "no alias provided" }) as typeof Log;
// Add binding to inject Pepr finalizer during admission (Mutate)
if (registerAdmission) {
const mutateBinding = {
...binding,
isMutate: true,
isFinalize: true,
event: Event.ANY,
mutateCallback: addFinalizer,
};
bindings.push(mutateBinding);
}
// Add binding to process finalizer callback / remove Pepr finalizer (Watch)
if (registerWatch) {
const watchBinding = {
...binding,
isWatch: true,
isFinalize: true,
event: Event.UPDATE,
finalizeCallback: async (update: InstanceType<T>, logger = aliasLogger): Promise<boolean | void> => {
Log.info(`Executing finalize action with alias: ${binding.alias || "no alias provided"}`);
return await finalizeCallback(update, logger);
},
};
bindings.push(watchBinding);
}
}
function InNamespace(...namespaces: string[]): BindingWithName<T> {
Log.debug(`Add namespaces filter ${namespaces}`, prefix);
binding.filters.namespaces.push(...namespaces);
return { ...commonChain, WithName, WithNameRegex };
}
function InNamespaceRegex(...namespaces: RegExp[]): BindingWithName<T> {
Log.debug(`Add regex namespaces filter ${namespaces}`, prefix);
binding.filters.regexNamespaces.push(...namespaces.map(regex => regex.source));
return { ...commonChain, WithName, WithNameRegex };
}
function WithDeletionTimestamp(): BindingFilter<T> {
Log.debug("Add deletionTimestamp filter");
binding.filters.deletionTimestamp = true;
return commonChain;
}
function WithNameRegex(regexName: RegExp): BindingFilter<T> {
Log.debug(`Add regex name filter ${regexName}`, prefix);
binding.filters.regexName = regexName.source;
return commonChain;
}
function WithName(name: string): BindingFilter<T> {
Log.debug(`Add name filter ${name}`, prefix);
binding.filters.name = name;
return commonChain;
}
function WithLabel(key: string, value = ""): BindingFilter<T> {
Log.debug(`Add label filter ${key}=${value}`, prefix);
binding.filters.labels[key] = value;
return commonChain;
}
function WithAnnotation(key: string, value = ""): BindingFilter<T> {
Log.debug(`Add annotation filter ${key}=${value}`, prefix);
binding.filters.annotations[key] = value;
return commonChain;
}
function Alias(alias: string): CommonChainType {
Log.debug(`Adding prefix alias ${alias}`, prefix);
binding.alias = alias;
return commonChain;
}
function bindEvent(event: Event): ExtendedCommonChainType {
binding.event = event;
return {
...commonChain,
InNamespace,
InNamespaceRegex,
WithName,
WithNameRegex,
WithDeletionTimestamp,
Alias,
};
}
return {
IsCreatedOrUpdated: () => bindEvent(Event.CREATE_OR_UPDATE),
IsCreated: () => bindEvent(Event.CREATE),
IsUpdated: () => bindEvent(Event.UPDATE),
IsDeleted: () => bindEvent(Event.DELETE),
};
};
}