scylla_cql/frame/response/
event.rs

1//! CQL protocol-level representation of an `EVENT` response.
2
3use crate::frame::frame_errors::{
4    ClusterChangeEventParseError, CqlEventParseError, SchemaChangeEventParseError,
5};
6use crate::frame::server_event_type::EventType;
7use crate::frame::types;
8use std::net::SocketAddr;
9
10/// Event that the server notified the client about.
11#[derive(Debug)]
12// Check triggers because all variants end with "Change".
13// TODO(2.0): Remove the "Change" postfix from variants.
14#[expect(clippy::enum_variant_names)]
15pub enum Event {
16    /// Topology changed.
17    TopologyChange(TopologyChangeEvent),
18    /// Status of a node changed.
19    StatusChange(StatusChangeEvent),
20    /// Schema changed.
21    SchemaChange(SchemaChangeEvent),
22}
23
24/// Event that notifies about changes in the cluster topology.
25#[derive(Debug)]
26pub enum TopologyChangeEvent {
27    /// A new node was added to the cluster.
28    NewNode(SocketAddr),
29    /// A node was removed from the cluster.
30    RemovedNode(SocketAddr),
31}
32
33/// Event that notifies about changes in the nodes' status.
34#[derive(Debug)]
35pub enum StatusChangeEvent {
36    /// A node went up.
37    Up(SocketAddr),
38    /// A node went down.
39    Down(SocketAddr),
40}
41
42/// Event that notifies about changes in the cluster topology.
43#[derive(Debug)]
44// Check triggers because all variants end with "Change".
45// TODO(2.0): Remove the "Change" postfix from variants.
46#[expect(clippy::enum_variant_names)]
47pub enum SchemaChangeEvent {
48    /// Keyspace was altered.
49    KeyspaceChange {
50        /// Type of change that was made to the keyspace.
51        change_type: SchemaChangeType,
52        /// Name of the keyspace that was altered.
53        keyspace_name: String,
54    },
55    /// Table was altered.
56    TableChange {
57        /// Type of change that was made to the table.
58        change_type: SchemaChangeType,
59        /// Name of the keyspace that contains the table.
60        keyspace_name: String,
61        /// Name of the table that was altered.
62        object_name: String,
63    },
64    /// Type was altered.
65    TypeChange {
66        /// Type of change that was made to the type.
67        change_type: SchemaChangeType,
68        /// Name of the keyspace that contains the type.
69        keyspace_name: String,
70        /// Name of the type that was altered.
71        type_name: String,
72    },
73    /// Function was altered.
74    FunctionChange {
75        /// Type of change that was made to the function.
76        change_type: SchemaChangeType,
77        /// Name of the keyspace that contains the function.
78        keyspace_name: String,
79        /// Name of the function that was altered.
80        function_name: String,
81        /// List of argument types of the function that was altered.
82        arguments: Vec<String>,
83    },
84    /// Aggregate was altered.
85    AggregateChange {
86        /// Type of change that was made to the aggregate.
87        change_type: SchemaChangeType,
88        /// Name of the keyspace that contains the aggregate.
89        keyspace_name: String,
90        /// Name of the aggregate that was altered.
91        aggregate_name: String,
92        /// List of argument types of the aggregate that was altered.
93        arguments: Vec<String>,
94    },
95}
96
97/// Type of change that was made to the schema.
98#[derive(Debug)]
99pub enum SchemaChangeType {
100    /// The affected schema item was created.
101    Created,
102
103    /// The affected schema item was updated.
104    Updated,
105
106    /// The affected schema item was dropped.
107    Dropped,
108
109    /// A placeholder for an invalid schema change type.
110    Invalid,
111}
112
113impl Event {
114    /// Deserialize an event from the provided buffer.
115    pub fn deserialize(buf: &mut &[u8]) -> Result<Self, CqlEventParseError> {
116        let event_type: EventType = types::read_string(buf)
117            .map_err(CqlEventParseError::EventTypeParseError)?
118            .parse()?;
119        match event_type {
120            EventType::TopologyChange => Ok(Self::TopologyChange(
121                TopologyChangeEvent::deserialize(buf)
122                    .map_err(CqlEventParseError::TopologyChangeEventParseError)?,
123            )),
124            EventType::StatusChange => Ok(Self::StatusChange(
125                StatusChangeEvent::deserialize(buf)
126                    .map_err(CqlEventParseError::StatusChangeEventParseError)?,
127            )),
128            EventType::SchemaChange => Ok(Self::SchemaChange(SchemaChangeEvent::deserialize(buf)?)),
129        }
130    }
131}
132
133impl SchemaChangeEvent {
134    /// Deserialize a schema change event from the provided buffer.
135    pub fn deserialize(buf: &mut &[u8]) -> Result<Self, SchemaChangeEventParseError> {
136        let type_of_change_string =
137            types::read_string(buf).map_err(SchemaChangeEventParseError::TypeOfChangeParseError)?;
138        let type_of_change = match type_of_change_string {
139            "CREATED" => SchemaChangeType::Created,
140            "UPDATED" => SchemaChangeType::Updated,
141            "DROPPED" => SchemaChangeType::Dropped,
142            _ => SchemaChangeType::Invalid,
143        };
144
145        let target =
146            types::read_string(buf).map_err(SchemaChangeEventParseError::TargetTypeParseError)?;
147        let keyspace_affected = types::read_string(buf)
148            .map_err(SchemaChangeEventParseError::AffectedKeyspaceParseError)?
149            .to_string();
150
151        match target {
152            "KEYSPACE" => Ok(Self::KeyspaceChange {
153                change_type: type_of_change,
154                keyspace_name: keyspace_affected,
155            }),
156            "TABLE" => {
157                let table_name = types::read_string(buf)
158                    .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
159                    .to_string();
160                Ok(Self::TableChange {
161                    change_type: type_of_change,
162                    keyspace_name: keyspace_affected,
163                    object_name: table_name,
164                })
165            }
166            "TYPE" => {
167                let changed_type = types::read_string(buf)
168                    .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
169                    .to_string();
170                Ok(Self::TypeChange {
171                    change_type: type_of_change,
172                    keyspace_name: keyspace_affected,
173                    type_name: changed_type,
174                })
175            }
176            "FUNCTION" => {
177                let function = types::read_string(buf)
178                    .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
179                    .to_string();
180                let number_of_arguments = types::read_short(buf).map_err(|err| {
181                    SchemaChangeEventParseError::ArgumentCountParseError(err.into())
182                })?;
183
184                let mut argument_vector = Vec::with_capacity(number_of_arguments as usize);
185
186                for _ in 0..number_of_arguments {
187                    argument_vector.push(
188                        types::read_string(buf)
189                            .map_err(SchemaChangeEventParseError::FunctionArgumentParseError)?
190                            .to_string(),
191                    );
192                }
193
194                Ok(Self::FunctionChange {
195                    change_type: type_of_change,
196                    keyspace_name: keyspace_affected,
197                    function_name: function,
198                    arguments: argument_vector,
199                })
200            }
201            "AGGREGATE" => {
202                let name = types::read_string(buf)
203                    .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
204                    .to_string();
205                let number_of_arguments = types::read_short(buf).map_err(|err| {
206                    SchemaChangeEventParseError::ArgumentCountParseError(err.into())
207                })?;
208
209                let mut argument_vector = Vec::with_capacity(number_of_arguments as usize);
210
211                for _ in 0..number_of_arguments {
212                    argument_vector.push(
213                        types::read_string(buf)
214                            .map_err(SchemaChangeEventParseError::FunctionArgumentParseError)?
215                            .to_string(),
216                    );
217                }
218
219                Ok(Self::AggregateChange {
220                    change_type: type_of_change,
221                    keyspace_name: keyspace_affected,
222                    aggregate_name: name,
223                    arguments: argument_vector,
224                })
225            }
226
227            _ => Err(SchemaChangeEventParseError::UnknownTargetOfSchemaChange(
228                target.to_string(),
229            )),
230        }
231    }
232}
233
234impl TopologyChangeEvent {
235    /// Deserialize a topology change event from the provided buffer.
236    pub fn deserialize(buf: &mut &[u8]) -> Result<Self, ClusterChangeEventParseError> {
237        let type_of_change = types::read_string(buf)
238            .map_err(ClusterChangeEventParseError::TypeOfChangeParseError)?;
239        let addr =
240            types::read_inet(buf).map_err(ClusterChangeEventParseError::NodeAddressParseError)?;
241
242        match type_of_change {
243            "NEW_NODE" => Ok(Self::NewNode(addr)),
244            "REMOVED_NODE" => Ok(Self::RemovedNode(addr)),
245            _ => Err(ClusterChangeEventParseError::UnknownTypeOfChange(
246                type_of_change.to_string(),
247            )),
248        }
249    }
250}
251
252impl StatusChangeEvent {
253    /// Deserialize a status change event from the provided buffer.
254    pub fn deserialize(buf: &mut &[u8]) -> Result<Self, ClusterChangeEventParseError> {
255        let type_of_change = types::read_string(buf)
256            .map_err(ClusterChangeEventParseError::TypeOfChangeParseError)?;
257        let addr =
258            types::read_inet(buf).map_err(ClusterChangeEventParseError::NodeAddressParseError)?;
259
260        match type_of_change {
261            "UP" => Ok(Self::Up(addr)),
262            "DOWN" => Ok(Self::Down(addr)),
263            _ => Err(ClusterChangeEventParseError::UnknownTypeOfChange(
264                type_of_change.to_string(),
265            )),
266        }
267    }
268}