darabos commited on
Commit
1df031f
·
1 Parent(s): 5be461a

Separate op_id from title.

Browse files
lynxkite-app/src/lynxkite_app/crdt.py CHANGED
@@ -128,7 +128,7 @@ def clean_input(ws_pyd):
128
  for p in list(node.data.params):
129
  if p.startswith("_"):
130
  del node.data.params[p]
131
- if node.data.title == "Comment":
132
  node.data.params = {}
133
  node.position.x = 0
134
  node.position.y = 0
 
128
  for p in list(node.data.params):
129
  if p.startswith("_"):
130
  del node.data.params[p]
131
+ if node.data.op_id == "Comment":
132
  node.data.params = {}
133
  node.position.x = 0
134
  node.position.y = 0
lynxkite-app/web/src/workspace/NodeSearch.tsx CHANGED
@@ -3,6 +3,7 @@ import { useEffect, useMemo, useRef, useState } from "react";
3
 
4
  export type OpsOp = {
5
  name: string;
 
6
  categories: string[];
7
  type: string;
8
  position: { x: number; y: number };
 
3
 
4
  export type OpsOp = {
5
  name: string;
6
+ id: string;
7
  categories: string[];
8
  type: string;
9
  position: { x: number; y: number };
lynxkite-app/web/src/workspace/Workspace.tsx CHANGED
@@ -316,6 +316,7 @@ function LynxKiteFlow() {
316
  data: {
317
  meta: { value: meta },
318
  title: meta.name,
 
319
  params: Object.fromEntries(meta.params.map((p) => [p.name, p.default])),
320
  },
321
  };
 
316
  data: {
317
  meta: { value: meta },
318
  title: meta.name,
319
+ op_id: meta.id,
320
  params: Object.fromEntries(meta.params.map((p) => [p.name, p.default])),
321
  },
322
  };
lynxkite-core/src/lynxkite/core/executors/one_by_one.py CHANGED
@@ -64,7 +64,7 @@ def _get_stages(ws, catalog: ops.Catalog):
64
  for edge in ws.edges:
65
  inputs.setdefault(edge.target, []).append(edge.source)
66
  node = nodes[edge.target]
67
- op = catalog[node.data.title]
68
  if op.get_input(edge.targetHandle).position.is_vertical():
69
  batch_inputs.setdefault(edge.target, []).append(edge.source)
70
  stages = []
@@ -111,9 +111,9 @@ async def _execute(ws: workspace.Workspace, catalog: ops.Catalog, cache=None):
111
  tasks = {}
112
  NO_INPUT = object() # Marker for initial tasks.
113
  for node in ws.nodes:
114
- op = catalog.get(node.data.title)
115
  if op is None:
116
- node.publish_error(f'Operation "{node.data.title}" not found.')
117
  continue
118
  node.publish_error(None)
119
  # Start tasks for nodes that have no non-batch inputs.
@@ -130,7 +130,7 @@ async def _execute(ws: workspace.Workspace, catalog: ops.Catalog, cache=None):
130
  next_stage.setdefault(n, []).extend(ts)
131
  continue
132
  node = nodes[n]
133
- op = catalog[node.data.title]
134
  params = {**node.data.params}
135
  if _has_ctx(op):
136
  params["_ctx"] = contexts[node.id]
@@ -181,7 +181,7 @@ async def _execute(ws: workspace.Workspace, catalog: ops.Catalog, cache=None):
181
  result.display = await _await_if_needed(result.display)
182
  for edge in edges[node.id]:
183
  t = nodes[edge.target]
184
- op = catalog[t.data.title]
185
  if op.get_input(edge.targetHandle).position.is_vertical():
186
  batch_inputs.setdefault((edge.target, edge.targetHandle), []).extend(
187
  results
 
64
  for edge in ws.edges:
65
  inputs.setdefault(edge.target, []).append(edge.source)
66
  node = nodes[edge.target]
67
+ op = catalog[node.data.op_id]
68
  if op.get_input(edge.targetHandle).position.is_vertical():
69
  batch_inputs.setdefault(edge.target, []).append(edge.source)
70
  stages = []
 
111
  tasks = {}
112
  NO_INPUT = object() # Marker for initial tasks.
113
  for node in ws.nodes:
114
+ op = catalog.get(node.data.op_id)
115
  if op is None:
116
+ node.publish_error(f'Operation "{node.data.op_id}" not found.')
117
  continue
118
  node.publish_error(None)
119
  # Start tasks for nodes that have no non-batch inputs.
 
130
  next_stage.setdefault(n, []).extend(ts)
131
  continue
132
  node = nodes[n]
133
+ op = catalog[node.data.op_id]
134
  params = {**node.data.params}
135
  if _has_ctx(op):
136
  params["_ctx"] = contexts[node.id]
 
181
  result.display = await _await_if_needed(result.display)
182
  for edge in edges[node.id]:
183
  t = nodes[edge.target]
184
+ op = catalog[t.data.op_id]
185
  if op.get_input(edge.targetHandle).position.is_vertical():
186
  batch_inputs.setdefault((edge.target, edge.targetHandle), []).extend(
187
  results
lynxkite-core/src/lynxkite/core/executors/simple.py CHANGED
@@ -37,7 +37,7 @@ async def execute(ws: workspace.Workspace, catalog: ops.Catalog):
37
  ts = graphlib.TopologicalSorter(dependencies)
38
  for node_id in ts.static_order():
39
  node = nodes[node_id]
40
- op = catalog[node.data.title]
41
  params = {**node.data.params}
42
  node.publish_started()
43
  try:
 
37
  ts = graphlib.TopologicalSorter(dependencies)
38
  for node_id in ts.static_order():
39
  node = nodes[node_id]
40
+ op = catalog[node.data.op_id]
41
  params = {**node.data.params}
42
  node.publish_started()
43
  try:
lynxkite-core/src/lynxkite/core/ops.py CHANGED
@@ -193,6 +193,8 @@ class Op(BaseConfig):
193
  type: str = "basic" # The UI to use for this operation.
194
  color: str = "orange" # The color of the operation in the UI.
195
  doc: object = None
 
 
196
 
197
  def __call__(self, *inputs, **params):
198
  # Convert parameters.
@@ -235,10 +237,10 @@ class Op(BaseConfig):
235
  res[p.name] = _param_to_type(p.name, params[p.name], p.type)
236
  return res
237
 
238
- @property
239
- def id(self) -> str:
240
- """The name and categories of the operation."""
241
- return " > ".join(self.categories + [self.name])
242
 
243
 
244
  def op(
@@ -384,7 +386,7 @@ def register_passive_op(env: str, *names: str, inputs=[], outputs=["output"], pa
384
  **kwargs,
385
  )
386
  CATALOGS.setdefault(env, {})
387
- CATALOGS[env][name] = op
388
  return op
389
 
390
 
 
193
  type: str = "basic" # The UI to use for this operation.
194
  color: str = "orange" # The color of the operation in the UI.
195
  doc: object = None
196
+ # ID is automatically set from the name and categories.
197
+ id: str = pydantic.Field(default=None)
198
 
199
  def __call__(self, *inputs, **params):
200
  # Convert parameters.
 
237
  res[p.name] = _param_to_type(p.name, params[p.name], p.type)
238
  return res
239
 
240
+ @pydantic.model_validator(mode="after")
241
+ def compute_id(self):
242
+ self.id = " > ".join(self.categories + [self.name])
243
+ return self
244
 
245
 
246
  def op(
 
386
  **kwargs,
387
  )
388
  CATALOGS.setdefault(env, {})
389
+ CATALOGS[env][op.id] = op
390
  return op
391
 
392
 
lynxkite-core/src/lynxkite/core/workspace.py CHANGED
@@ -30,6 +30,7 @@ class NodeStatus(str, enum.Enum):
30
 
31
  class WorkspaceNodeData(BaseConfig):
32
  title: str
 
33
  params: dict
34
  display: Optional[object] = None
35
  input_metadata: Optional[object] = None
@@ -106,7 +107,7 @@ class Workspace(BaseConfig):
106
  if self.env not in ops.CATALOGS:
107
  return self
108
  catalog = ops.CATALOGS[self.env]
109
- _ops = {n.id: catalog[n.data.title] for n in self.nodes if n.data.title in catalog}
110
  valid_targets = set()
111
  valid_sources = set()
112
  for n in self.nodes:
@@ -174,7 +175,7 @@ class Workspace(BaseConfig):
174
  catalog = ops.CATALOGS[self.env]
175
  for node in self.nodes:
176
  data = node.data
177
- op = catalog.get(data.title)
178
  if op:
179
  if getattr(data, "meta", None) != op:
180
  data.meta = op
@@ -208,7 +209,9 @@ class Workspace(BaseConfig):
208
  random_string = os.urandom(4).hex()
209
  if func:
210
  kwargs["type"] = func.__op__.type
211
- kwargs["data"] = WorkspaceNodeData(title=func.__op__.name, params={})
 
 
212
  kwargs.setdefault("type", "basic")
213
  kwargs.setdefault("id", f"{kwargs['data'].title} {random_string}")
214
  kwargs.setdefault("position", Position(x=0, y=0))
 
30
 
31
  class WorkspaceNodeData(BaseConfig):
32
  title: str
33
+ op_id: str
34
  params: dict
35
  display: Optional[object] = None
36
  input_metadata: Optional[object] = None
 
107
  if self.env not in ops.CATALOGS:
108
  return self
109
  catalog = ops.CATALOGS[self.env]
110
+ _ops = {n.id: catalog[n.data.op_id] for n in self.nodes if n.data.op_id in catalog}
111
  valid_targets = set()
112
  valid_sources = set()
113
  for n in self.nodes:
 
175
  catalog = ops.CATALOGS[self.env]
176
  for node in self.nodes:
177
  data = node.data
178
+ op = catalog.get(data.op_id)
179
  if op:
180
  if getattr(data, "meta", None) != op:
181
  data.meta = op
 
209
  random_string = os.urandom(4).hex()
210
  if func:
211
  kwargs["type"] = func.__op__.type
212
+ kwargs["data"] = WorkspaceNodeData(
213
+ title=func.__op__.name, op_id=func.__op__.id, params={}
214
+ )
215
  kwargs.setdefault("type", "basic")
216
  kwargs.setdefault("id", f"{kwargs['data'].title} {random_string}")
217
  kwargs.setdefault("position", Position(x=0, y=0))
lynxkite-core/tests/test_workspace.py CHANGED
@@ -42,6 +42,7 @@ def test_save_load():
42
  assert node.id == loaded_node.id
43
  assert node.type == loaded_node.type
44
  assert node.data.title == loaded_node.data.title
 
45
  assert node.data.params == loaded_node.data.params
46
  assert node.position.x == loaded_node.position.x
47
  assert node.position.y == loaded_node.position.y
 
42
  assert node.id == loaded_node.id
43
  assert node.type == loaded_node.type
44
  assert node.data.title == loaded_node.data.title
45
+ assert node.data.op_id == loaded_node.data.op_id
46
  assert node.data.params == loaded_node.data.params
47
  assert node.position.x == loaded_node.position.x
48
  assert node.position.y == loaded_node.position.y
lynxkite-graph-analytics/src/lynxkite_graph_analytics/core.py CHANGED
@@ -173,7 +173,7 @@ def disambiguate_edges(ws: workspace.Workspace):
173
  seen = set()
174
  for edge in reversed(ws.edges):
175
  dst_node = nodes[edge.target]
176
- op = catalog.get(dst_node.data.title)
177
  if op.get_input(edge.targetHandle).type == list[Bundle]:
178
  # Takes multiple bundles as an input. No need to disambiguate.
179
  continue
@@ -224,7 +224,7 @@ async def _execute_node(
224
  node: workspace.WorkspaceNode, ws: workspace.Workspace, catalog: ops.Catalog, outputs: Outputs
225
  ):
226
  params = {**node.data.params}
227
- op = catalog.get(node.data.title)
228
  if not op:
229
  node.publish_error("Operation not found in catalog")
230
  return
 
173
  seen = set()
174
  for edge in reversed(ws.edges):
175
  dst_node = nodes[edge.target]
176
+ op = catalog.get(dst_node.data.op_id)
177
  if op.get_input(edge.targetHandle).type == list[Bundle]:
178
  # Takes multiple bundles as an input. No need to disambiguate.
179
  continue
 
224
  node: workspace.WorkspaceNode, ws: workspace.Workspace, catalog: ops.Catalog, outputs: Outputs
225
  ):
226
  params = {**node.data.params}
227
+ op = catalog.get(node.data.op_id)
228
  if not op:
229
  node.publish_error("Operation not found in catalog")
230
  return
lynxkite-graph-analytics/src/lynxkite_graph_analytics/networkx_ops.py CHANGED
@@ -301,7 +301,7 @@ def register_networkx(env: str):
301
  outputs=[ops.Output(name="output", type=nx.Graph)],
302
  type="basic",
303
  )
304
- cat[nicename] = op
305
  counter += 1
306
  print(f"Registered {counter} NetworkX operations.")
307
 
 
301
  outputs=[ops.Output(name="output", type=nx.Graph)],
302
  type="basic",
303
  )
304
+ cat[op.id] = op
305
  counter += 1
306
  print(f"Registered {counter} NetworkX operations.")
307
 
lynxkite-graph-analytics/src/lynxkite_graph_analytics/pytorch/pytorch_core.py CHANGED
@@ -231,11 +231,11 @@ class ModelBuilder:
231
  # Clean up disconnected nodes.
232
  to_delete = set()
233
  for node_id in self.nodes:
234
- title = self.nodes[node_id].data.title
235
- if title not in self.catalog: # Groups and comments, for example.
236
  to_delete.add(node_id)
237
  continue
238
- op = self.catalog[title]
239
  if len(self.in_edges[node_id]) != len(op.inputs): # Unconnected inputs.
240
  to_delete.add(node_id)
241
  to_delete |= self.all_upstream(node_id)
@@ -266,7 +266,7 @@ class ModelBuilder:
266
  """Adds the layer(s) produced by this node to self.layers."""
267
  node = self.nodes[node_id]
268
  t = node.data.title
269
- op = self.catalog[t]
270
  p = op.convert_params(node.data.params)
271
  match t:
272
  case "Repeat":
@@ -375,9 +375,8 @@ class ModelBuilder:
375
  names = {}
376
  for i in ids:
377
  for node in self.nodes.values():
378
- title = node.data.title
379
- op = self.catalog[title]
380
- name = node.data.params.get("name") or title
381
  for output in op.outputs:
382
  i2 = _to_id(node.id, output.name)
383
  if i2 == i:
 
231
  # Clean up disconnected nodes.
232
  to_delete = set()
233
  for node_id in self.nodes:
234
+ op_id = self.nodes[node_id].data.op_id
235
+ if op_id not in self.catalog: # Groups and comments, for example.
236
  to_delete.add(node_id)
237
  continue
238
+ op = self.catalog[op_id]
239
  if len(self.in_edges[node_id]) != len(op.inputs): # Unconnected inputs.
240
  to_delete.add(node_id)
241
  to_delete |= self.all_upstream(node_id)
 
266
  """Adds the layer(s) produced by this node to self.layers."""
267
  node = self.nodes[node_id]
268
  t = node.data.title
269
+ op = self.catalog[node.data.op_id]
270
  p = op.convert_params(node.data.params)
271
  match t:
272
  case "Repeat":
 
375
  names = {}
376
  for i in ids:
377
  for node in self.nodes.values():
378
+ op = self.catalog[node.data.op_id]
379
+ name = node.data.params.get("name") or node.data.title
 
380
  for output in op.outputs:
381
  i2 = _to_id(node.id, output.name)
382
  if i2 == i: