Commit graph

4 commits

Author SHA1 Message Date
Joe Reuter
b1d85aa6a8
🌊 Streams: Add streams feature privilege (#218966)
This PR adds a new feature for streams to control whether the UI and API
is available.

Changes:
* Add the feature with two privileges (`show` and `manage_assets`)
* Can be configured with the classic `none`/`read`/`all` so it's
automatically aligned with serverless editor/viewer permissions
* None also means the app is not shown - to do this, the existing
`status$` observable also looks for at least the `streams.show`
capability
* Only guards changes to the linked dashboards - changes to the
Elasticsearch level are still delegated to the Elasticsearch-level
permissions of the user
* This happens on the UI level (disabled button and dashboard selection
on the dashboard page)
* and on the API level (all endpoints that can change linked dashboards
require the permission)


# Questions

* Not sure about the name `manage_assets` - maybe it should be something
else
* Not sure about how the queries stuff should work - @kdelemme anything
we should do in this area?

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
2025-04-28 11:50:05 +02:00
Milton Hultgren
fa23a90d80
🌊 Refactor API control flow for stream management (#211696)
### Background
This PR is a proposal for a different way to structure the Streams code
flow based on some challenges faced while working on
https://github.com/elastic/streams-program/issues/26 and discussed
[here](https://github.com/elastic/streams-program/discussions/147) and
[here](https://github.com/elastic/streams-program/discussions/55),
mainly around finding it difficult to decide where to place certain
validations that need access to the state as a whole.
It is also in response to some expressed difficulty about how to add new
stream types into the code base.

It aims to achieve 3 goals:
1. It is easy to add new stream types and there is a clear place where
changes (new validation, new logic) for existing stream types happen,
making the code easier to evolve over time
2. It is easier to improve the robustness of the system because there
are clear phases where problems can be caught, fixed and rolled back
3. It lays some ground work for features such as [bulk
changes](https://github.com/elastic/streams-program/issues/125), [dry
runs](https://github.com/elastic/streams-program/discussions/138) and a
[health
endpoint](https://github.com/elastic/streams-program/discussions/139)

In the future, this will most likely be handled by Elasticsearch to a
large degree, as imagined in
https://github.com/elastic/streams-program/discussions/30

The solution takes inspiration from the reconciliation / controller
pattern that Kubernetes uses, where users specify a desired state and
the system takes action towards reaching that step. But it is also
somewhat more similar to how React's Virtual DOM works in that it
happens in a single iteration.

Another key pattern is the [Active Record
pattern](https://www.martinfowler.com/eaaCatalog/activeRecord.html), we
let each stream class contain all the logic for how to validate and
modify that stream in Elasticsearch. The client and `State` class simply
orchestrate the flow but defer all actual work and decision making to
the stream classes.

**Note:** This PoC ignores the management of assets 

### Summary

The process takes the following steps:
1. A route accepts a request (upsert / delete) and translates it into
one or more (for bulk) `StreamChange` objects before passing these to
`State.applyChanges` method (which also takes a toggle for dry runs)
2. The current state of Streams is loaded by using the `State` class
3. The changes are then applied to the current state to derive the
desired state [1]
4. The desired state is then validated, this is done by asking each
individual stream if given the desired state and starting state, from
the perspective of that individual stream, is it in a valid state
(upserted or deleted correctly)
5. If the state is invalid, we return those errors and stop
6. Else we continue, if it's a dry run, we ask the desired state object
for what has changed and report that in the shape of the Elasticsearch
actions that would be attempted
7. Else we proceed to commit the changes to Elasticsearch by asking each
changed stream to determine which Elasticsearch actions need to be
performed to reach the desired state
8. These actions are then combined and sent to the `ExecutionPlan` class
which does planning (mainly for actions around Unwired streams) and then
handles executing the actions in the most parallel way but in the safe
order
9. If any error happens, we attempt to revert back to the starting state
by taking the changed streams and marking each stream as created based
on the starting state and then getting the Elasticsearch actions for
that and applying those

This PR also changes our `resync` endpoint to make use of the same rough
strategy (load current state, mark all as created, get Elasticsearch
actions and apply).

[1] Applying changes:

1. The current state is first cloned
2. Then for each change we see if it is a deletion or an upsert
3. Based on this we either mark existing streams for deletion or
create/update existing streams
10. When creating a new stream instance we use the helper
`streamFromDefinition` which is the only mapping between the definition
documents and the Active Record-style stream type classes
11. As part of this, each stream that changes is marked in the desired
state
12. The stream is passed the desired and current state and should update
itself based on the change
13. The stream can return a set of cascading changes (taking the same
format as the requested changes) which are executed directly after but
we have a limit for how many rounds of cascading changes can happen to
avoid infinite loops

### Adding new stream types

Key in all of this is that the client and `State` classes don't know
anything about any of the specific stream types, they know only of the
`StreamActiveRecord` interface.
When adding a new stream type you need to implement this interface and
update `streamFromDefinition` to create the right class for your new
definition. Streams of different types should only interact with each
other by creating cascading changes.

### Possible follow up tasks
- Introduce a lazy Elasticsearch cluster state cache because multiple
places in the code access the same stuff over and over again
- Make API endpoints the consume `attemptChanges` pass back the
`DesiredState` and planned `ElasticsearchActions` as debug information
based on a flag (maybe also all cascading changes)
- Don't run cascading changes by default but run them if _some_ flag is
submitted based on
https://github.com/elastic/streams-program/discussions/230
- Wrap `attemptChanges` and `resync` with the new LockManager
https://github.com/elastic/kibana/pull/216397
- Unit test WiredStream, UnwiredStream and GroupStream
- Clean up old sync helpers 
- Wrap ES calls to get better stack traces for errors

### Out of scope

- Asset linking and content pack installation (it's probably okay for
these to continue to use the asset client directly since there is less
domain logic and no cascading changes involved)

---------

Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
2025-04-08 13:02:56 +02:00
Chris Cowan
508c89fc15
[Streams 🌊] Add initial telemetry to Streams (#214027)
## Summary

This PR adds EBT telemetry to the Streams backend and UI. For the
backend APIs, we are only measuring latency on the "write" endpoints
that have the highest potential for being slow:

- `POST /api/streams/{name}/processing/_simulate 2023-10-31`
- `POST /api/streams/{name}/processing/_suggestions 2023-10-31`
- `POST /api/streams/{name}/_fork 2023-10-31`
- `PUT /api/streams/{name}/dashboards/{dashboardId} 2023-10-31`
- `PUT /api/streams/{name} 2023-10-31`
- `PUT /api/streams/{name}/_group 2023-10-31`
- `PUT /api/streams/{name}/_ingest 2023-10-31`
- `DELETE /api/streams/{name} 2023-10-31`
- `POST /api/streams/_enable 2023-10-31`
- `POST /api/streams/_disable 2023-10-31`
- `POST /api/streams/_resync 2023-10-31`

This is controlled by an allow list located in the server's telemetry
client.

For the UI, I've added the following tracking:

- **Number of Dashboards associated with a Stream** – This captures the
stream name and the number of dashboards associated with it. It's
attached to the dashboard hook, this means that we only track dashboard
counts on streams that users are actually using.
- **Clicks on Dashboards** – This captures the name of the dashboard,
the dashboard id, and the stream name
- **Latency of AI Grok Suggestions** – This captures the name of the
stream, the field used, the number of suggested patterns, the success
rates of the suggest patterns, and the connector id of the LLM. This
will also capture if the AI returned ZERO suggestions.
- **When a user accepts an AI Grok Suggestion** – This captures the name
of the stream, the name of the field, the success rate of the pattern,
and number of fields detected.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
2025-03-17 19:34:52 +00:00
Joe Reuter
8a9bb363aa
🌊 Move streams to platform (#211893)
This PR moves the `streams` and `streams_app` plugins into platform so
they can be used in other solutions in the future. This PR is not
actually making it available in other solutions yet since we are still
discussing the release plans.

## Inlined helpers

As discussed before, this PR inlines a couple simple helper methods for
query building, time zone normalization, a header portal helper and a
data plugin timefilter state react integration hook as there is no good
place for these outside of the observability solution.

## streams_app plugin

The streams_app plugin is not actually registering anything, instead it
simply exports a component that renders the app which needs to be
consumed by another plugin to turn it into a registered app - for now,
`observability_streams_wrapper` takes over this job.

## observability_streams_wrapper plugin

While 99% of the streams logic is moved into the
`platform/shared/streams_app`, two bits are left behind in
`observability_streams_wrapper`:
* The actual app registration
* Integration with the observability_shared `PageTemplate` component

Once we decide streams should be displayed outside of the observability
solution, it's probably not necessary anymore to decouple app definition
and registration like this because it will always be visible no matter
the solution. Once this is the case, the navigation registration can be
moved into the central `observability` plugin, like it's handled with
other apps like infra.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
2025-02-21 16:34:50 +01:00
Renamed from x-pack/solutions/observability/plugins/streams/server/plugin.ts (Browse further)