scylla_cql/frame/response/
event.rs

1use crate::frame::frame_errors::{
2    ClusterChangeEventParseError, CqlEventParseError, SchemaChangeEventParseError,
3};
4use crate::frame::server_event_type::EventType;
5use crate::frame::types;
6use std::net::SocketAddr;
7
8#[derive(Debug)]
9pub enum Event {
10    TopologyChange(TopologyChangeEvent),
11    StatusChange(StatusChangeEvent),
12    SchemaChange(SchemaChangeEvent),
13}
14
15#[derive(Debug)]
16pub enum TopologyChangeEvent {
17    NewNode(SocketAddr),
18    RemovedNode(SocketAddr),
19}
20
21#[derive(Debug)]
22pub enum StatusChangeEvent {
23    Up(SocketAddr),
24    Down(SocketAddr),
25}
26
27#[derive(Debug)]
28pub enum SchemaChangeEvent {
29    KeyspaceChange {
30        change_type: SchemaChangeType,
31        keyspace_name: String,
32    },
33    TableChange {
34        change_type: SchemaChangeType,
35        keyspace_name: String,
36        object_name: String,
37    },
38    TypeChange {
39        change_type: SchemaChangeType,
40        keyspace_name: String,
41        type_name: String,
42    },
43    FunctionChange {
44        change_type: SchemaChangeType,
45        keyspace_name: String,
46        function_name: String,
47        arguments: Vec<String>,
48    },
49    AggregateChange {
50        change_type: SchemaChangeType,
51        keyspace_name: String,
52        aggregate_name: String,
53        arguments: Vec<String>,
54    },
55}
56
57#[derive(Debug)]
58pub enum SchemaChangeType {
59    Created,
60    Updated,
61    Dropped,
62    Invalid,
63}
64
65impl Event {
66    pub fn deserialize(buf: &mut &[u8]) -> Result<Self, CqlEventParseError> {
67        let event_type: EventType = types::read_string(buf)
68            .map_err(CqlEventParseError::EventTypeParseError)?
69            .parse()?;
70        match event_type {
71            EventType::TopologyChange => Ok(Self::TopologyChange(
72                TopologyChangeEvent::deserialize(buf)
73                    .map_err(CqlEventParseError::TopologyChangeEventParseError)?,
74            )),
75            EventType::StatusChange => Ok(Self::StatusChange(
76                StatusChangeEvent::deserialize(buf)
77                    .map_err(CqlEventParseError::StatusChangeEventParseError)?,
78            )),
79            EventType::SchemaChange => Ok(Self::SchemaChange(SchemaChangeEvent::deserialize(buf)?)),
80        }
81    }
82}
83
84impl SchemaChangeEvent {
85    pub fn deserialize(buf: &mut &[u8]) -> Result<Self, SchemaChangeEventParseError> {
86        let type_of_change_string =
87            types::read_string(buf).map_err(SchemaChangeEventParseError::TypeOfChangeParseError)?;
88        let type_of_change = match type_of_change_string {
89            "CREATED" => SchemaChangeType::Created,
90            "UPDATED" => SchemaChangeType::Updated,
91            "DROPPED" => SchemaChangeType::Dropped,
92            _ => SchemaChangeType::Invalid,
93        };
94
95        let target =
96            types::read_string(buf).map_err(SchemaChangeEventParseError::TargetTypeParseError)?;
97        let keyspace_affected = types::read_string(buf)
98            .map_err(SchemaChangeEventParseError::AffectedKeyspaceParseError)?
99            .to_string();
100
101        match target {
102            "KEYSPACE" => Ok(Self::KeyspaceChange {
103                change_type: type_of_change,
104                keyspace_name: keyspace_affected,
105            }),
106            "TABLE" => {
107                let table_name = types::read_string(buf)
108                    .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
109                    .to_string();
110                Ok(Self::TableChange {
111                    change_type: type_of_change,
112                    keyspace_name: keyspace_affected,
113                    object_name: table_name,
114                })
115            }
116            "TYPE" => {
117                let changed_type = types::read_string(buf)
118                    .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
119                    .to_string();
120                Ok(Self::TypeChange {
121                    change_type: type_of_change,
122                    keyspace_name: keyspace_affected,
123                    type_name: changed_type,
124                })
125            }
126            "FUNCTION" => {
127                let function = types::read_string(buf)
128                    .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
129                    .to_string();
130                let number_of_arguments = types::read_short(buf).map_err(|err| {
131                    SchemaChangeEventParseError::ArgumentCountParseError(err.into())
132                })?;
133
134                let mut argument_vector = Vec::with_capacity(number_of_arguments as usize);
135
136                for _ in 0..number_of_arguments {
137                    argument_vector.push(
138                        types::read_string(buf)
139                            .map_err(SchemaChangeEventParseError::FunctionArgumentParseError)?
140                            .to_string(),
141                    );
142                }
143
144                Ok(Self::FunctionChange {
145                    change_type: type_of_change,
146                    keyspace_name: keyspace_affected,
147                    function_name: function,
148                    arguments: argument_vector,
149                })
150            }
151            "AGGREGATE" => {
152                let name = types::read_string(buf)
153                    .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
154                    .to_string();
155                let number_of_arguments = types::read_short(buf).map_err(|err| {
156                    SchemaChangeEventParseError::ArgumentCountParseError(err.into())
157                })?;
158
159                let mut argument_vector = Vec::with_capacity(number_of_arguments as usize);
160
161                for _ in 0..number_of_arguments {
162                    argument_vector.push(
163                        types::read_string(buf)
164                            .map_err(SchemaChangeEventParseError::FunctionArgumentParseError)?
165                            .to_string(),
166                    );
167                }
168
169                Ok(Self::AggregateChange {
170                    change_type: type_of_change,
171                    keyspace_name: keyspace_affected,
172                    aggregate_name: name,
173                    arguments: argument_vector,
174                })
175            }
176
177            _ => Err(SchemaChangeEventParseError::UnknownTargetOfSchemaChange(
178                target.to_string(),
179            )),
180        }
181    }
182}
183
184impl TopologyChangeEvent {
185    pub fn deserialize(buf: &mut &[u8]) -> Result<Self, ClusterChangeEventParseError> {
186        let type_of_change = types::read_string(buf)
187            .map_err(ClusterChangeEventParseError::TypeOfChangeParseError)?;
188        let addr =
189            types::read_inet(buf).map_err(ClusterChangeEventParseError::NodeAddressParseError)?;
190
191        match type_of_change {
192            "NEW_NODE" => Ok(Self::NewNode(addr)),
193            "REMOVED_NODE" => Ok(Self::RemovedNode(addr)),
194            _ => Err(ClusterChangeEventParseError::UnknownTypeOfChange(
195                type_of_change.to_string(),
196            )),
197        }
198    }
199}
200
201impl StatusChangeEvent {
202    pub fn deserialize(buf: &mut &[u8]) -> Result<Self, ClusterChangeEventParseError> {
203        let type_of_change = types::read_string(buf)
204            .map_err(ClusterChangeEventParseError::TypeOfChangeParseError)?;
205        let addr =
206            types::read_inet(buf).map_err(ClusterChangeEventParseError::NodeAddressParseError)?;
207
208        match type_of_change {
209            "UP" => Ok(Self::Up(addr)),
210            "DOWN" => Ok(Self::Down(addr)),
211            _ => Err(ClusterChangeEventParseError::UnknownTypeOfChange(
212                type_of_change.to_string(),
213            )),
214        }
215    }
216}